Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
88313b90
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
88313b90
编写于
6月 09, 2022
作者:
L
Liu Jicong
提交者:
GitHub
6月 09, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13619 from taosdata/feature/stream
fix(query): decode
上级
e3e6f783
9e4b3adc
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
58 addition
and
190 deletion
+58
-190
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+1
-0
source/client/src/tmq.c
source/client/src/tmq.c
+3
-178
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+26
-5
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+2
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+1
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+3
-3
source/libs/stream/src/stream.c
source/libs/stream/src/stream.c
+13
-2
source/libs/stream/src/streamDispatch.c
source/libs/stream/src/streamDispatch.c
+8
-1
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+1
-0
未找到文件。
include/libs/stream/tstream.h
浏览文件 @
88313b90
...
...
@@ -339,6 +339,7 @@ typedef struct {
int32_t
sourceTaskId
;
int32_t
sourceVg
;
int32_t
sourceChildId
;
int32_t
upstreamNodeId
;
#if 0
int64_t sourceVer;
#endif
...
...
source/client/src/tmq.c
浏览文件 @
88313b90
...
...
@@ -494,7 +494,6 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
tmqAskEp
(
tmq
,
true
);
taosTmrReset
(
tmqAssignDelayedHbTask
,
1000
,
tmq
,
tmqMgmt
.
timer
,
&
tmq
->
hbTimer
);
}
else
if
(
*
pTaskType
==
TMQ_DELAYED_TASK__COMMIT
)
{
/*tmq_commit(tmq, NULL, true);*/
tmqCommitInner
(
tmq
,
NULL
,
1
,
1
,
tmq
->
commitCb
,
tmq
->
commitCbUserParam
);
taosTmrReset
(
tmqAssignDelayedCommitTask
,
tmq
->
autoCommitInterval
,
tmq
,
tmqMgmt
.
timer
,
&
tmq
->
commitTimer
);
}
else
if
(
*
pTaskType
==
TMQ_DELAYED_TASK__REPORT
)
{
...
...
@@ -667,94 +666,6 @@ FAIL:
tmq_resp_err_t
tmq_commit
(
tmq_t
*
tmq
,
const
tmq_topic_vgroup_list_t
*
offsets
,
int32_t
async
)
{
return
tmqCommitInner
(
tmq
,
offsets
,
0
,
async
,
tmq
->
commitCb
,
tmq
->
commitCbUserParam
);
#if 0
// TODO: add read write lock
SRequestObj* pRequest = NULL;
tmq_resp_err_t resp = TMQ_RESP_ERR__SUCCESS;
// build msg
// send to mnode
SMqCMCommitOffsetReq req;
SArray* pOffsets = NULL;
if (offsets == NULL) {
pOffsets = taosArrayInit(0, sizeof(SMqOffset));
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
SMqOffset offset;
strcpy(offset.topicName, pTopic->topicName);
strcpy(offset.cgroup, tmq->groupId);
offset.vgId = pVg->vgId;
offset.offset = pVg->currentOffset;
taosArrayPush(pOffsets, &offset);
}
}
req.num = pOffsets->size;
req.offsets = pOffsets->pData;
} else {
req.num = taosArrayGetSize(&offsets->container);
req.offsets = (SMqOffset*)offsets->container.pData;
}
SEncoder encoder;
tEncoderInit(&encoder, NULL, 0);
tEncodeSMqCMCommitOffsetReq(&encoder, &req);
int32_t tlen = encoder.pos;
void* buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
tEncoderClear(&encoder);
return -1;
}
tEncoderClear(&encoder);
tEncoderInit(&encoder, buf, tlen);
tEncodeSMqCMCommitOffsetReq(&encoder, &req);
tEncoderClear(&encoder);
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET);
if (pRequest == NULL) {
tscError("failed to malloc request");
}
SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
if (pParam == NULL) {
return -1;
}
pParam->tmq = tmq;
tsem_init(&pParam->rspSem, 0, 0);
pParam->async = async;
pParam->offsets = pOffsets;
pRequest->body.requestMsg = (SDataBuf){
.pData = buf,
.len = tlen,
.handle = NULL,
};
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmqCommitCb;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (!async) {
tsem_wait(&pParam->rspSem);
resp = pParam->rspErr;
tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam);
if (pOffsets) {
taosArrayDestroy(pOffsets);
}
}
return resp;
#endif
}
tmq_resp_err_t
tmq_subscribe
(
tmq_t
*
tmq
,
const
tmq_list_t
*
topic_list
)
{
...
...
@@ -859,93 +770,6 @@ void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* para
conf
->
commitCbUserParam
=
param
;
}
#if 0
TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) {
STscObj* pTscObj = (STscObj*)taos;
SRequestObj* pRequest = NULL;
SQuery* pQueryNode = NULL;
char* astStr = NULL;
int32_t sqlLen;
terrno = TSDB_CODE_SUCCESS;
if (taos == NULL || streamName == NULL || sql == NULL) {
tscError("invalid parameters for creating stream, connObj:%p, stream name:%s, sql:%s", taos, streamName, sql);
terrno = TSDB_CODE_TSC_INVALID_INPUT;
goto _return;
}
sqlLen = strlen(sql);
if (strlen(tbName) >= TSDB_TABLE_NAME_LEN) {
tscError("output tb name too long, max length:%d", TSDB_TABLE_NAME_LEN - 1);
terrno = TSDB_CODE_TSC_INVALID_INPUT;
goto _return;
}
if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) {
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
goto _return;
}
tscDebug("start to create stream: %s", streamName);
int32_t code = 0;
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode, NULL), _return);
CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);
/*printf("%s\n", pStr);*/
SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T};
strcpy(name.dbname, pRequest->pDb);
strcpy(name.tname, streamName);
SCMCreateStreamReq req = {
.igExists = 1,
.ast = astStr,
.sql = (char*)sql,
};
tNameExtractFullName(&name, req.name);
strcpy(req.targetStbFullName, tbName);
int tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req);
void* buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
goto _return;
}
tSerializeSCMCreateStreamReq(buf, tlen, &req);
pRequest->body.requestMsg = (SDataBuf){
.pData = buf,
.len = tlen,
.handle = NULL,
};
pRequest->type = TDMT_MND_CREATE_STREAM;
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pRequest->body.rspSem);
_return:
taosMemoryFreeClear(astStr);
qDestroyQuery(pQueryNode);
/*if (sendInfo != NULL) {*/
/*destroySendMsgInfo(sendInfo);*/
/*}*/
if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
pRequest->code = terrno;
}
return pRequest;
}
#endif
#if 0
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return 0;
...
...
@@ -1540,10 +1364,11 @@ const char* tmq_get_table_name(TAOS_RES* res) {
}
return
NULL
;
}
DLL_EXPORT
void
tmq_commit_async
(
tmq_t
*
tmq
,
const
tmq_topic_vgroup_list_t
*
offsets
,
tmq_commit_cb
*
cb
,
void
*
param
)
{
void
tmq_commit_async
(
tmq_t
*
tmq
,
const
tmq_topic_vgroup_list_t
*
offsets
,
tmq_commit_cb
*
cb
,
void
*
param
)
{
tmqCommitInner
(
tmq
,
offsets
,
0
,
1
,
cb
,
param
);
}
DLL_EXPORT
tmq_resp_err_t
tmq_commit_sync
(
tmq_t
*
tmq
,
const
tmq_topic_vgroup_list_t
*
offsets
)
{
tmq_resp_err_t
tmq_commit_sync
(
tmq_t
*
tmq
,
const
tmq_topic_vgroup_list_t
*
offsets
)
{
return
tmqCommitInner
(
tmq
,
offsets
,
0
,
0
,
NULL
,
NULL
);
}
source/common/src/tdatablock.c
浏览文件 @
88313b90
...
...
@@ -684,7 +684,8 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) {
*/
size_t
blockDataGetSerialMetaSize
(
uint32_t
numOfCols
)
{
// | total rows/total length | block group id | column schema | each column length |
return
sizeof
(
int32_t
)
+
sizeof
(
uint64_t
)
+
numOfCols
*
(
sizeof
(
int16_t
)
+
sizeof
(
int32_t
))
+
numOfCols
*
sizeof
(
int32_t
);
return
sizeof
(
int32_t
)
+
sizeof
(
uint64_t
)
+
numOfCols
*
(
sizeof
(
int16_t
)
+
sizeof
(
int32_t
))
+
numOfCols
*
sizeof
(
int32_t
);
}
double
blockDataGetSerialRowSize
(
const
SSDataBlock
*
pBlock
)
{
...
...
@@ -1893,12 +1894,12 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen
uint64_t
*
groupId
=
(
uint64_t
*
)
data
;
data
+=
sizeof
(
uint64_t
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
*
((
int16_t
*
)
data
)
=
pColInfoData
->
info
.
type
;
*
((
int16_t
*
)
data
)
=
pColInfoData
->
info
.
type
;
data
+=
sizeof
(
int16_t
);
*
((
int32_t
*
)
data
)
=
pColInfoData
->
info
.
bytes
;
*
((
int32_t
*
)
data
)
=
pColInfoData
->
info
.
bytes
;
data
+=
sizeof
(
int32_t
);
}
...
...
@@ -1944,6 +1945,8 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen
const
char
*
blockCompressDecode
(
SSDataBlock
*
pBlock
,
int32_t
numOfCols
,
int32_t
numOfRows
,
const
char
*
pData
)
{
blockDataEnsureCapacity
(
pBlock
,
numOfRows
);
pBlock
->
info
.
rows
=
numOfRows
;
const
char
*
pStart
=
pData
;
int32_t
dataLen
=
*
(
int32_t
*
)
pStart
;
...
...
@@ -1952,13 +1955,25 @@ const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t
pBlock
->
info
.
groupId
=
*
(
uint64_t
*
)
pStart
;
pStart
+=
sizeof
(
uint64_t
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
if
(
pBlock
->
pDataBlock
==
NULL
)
{
pBlock
->
pDataBlock
=
taosArrayInit
(
numOfCols
,
sizeof
(
SColumnInfoData
));
taosArraySetSize
(
pBlock
->
pDataBlock
,
numOfCols
);
}
pBlock
->
info
.
numOfCols
=
taosArrayGetSize
(
pBlock
->
pDataBlock
);
ASSERT
(
pBlock
->
pDataBlock
->
size
>=
numOfCols
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
pColInfoData
->
info
.
type
=
*
(
int16_t
*
)
pStart
;
pStart
+=
sizeof
(
int16_t
);
pColInfoData
->
info
.
bytes
=
*
(
int32_t
*
)
pStart
;
pStart
+=
sizeof
(
int32_t
);
if
(
IS_VAR_DATA_TYPE
(
pColInfoData
->
info
.
type
))
{
pBlock
->
info
.
hasVarCol
=
true
;
}
}
blockDataEnsureCapacity
(
pBlock
,
numOfRows
);
...
...
@@ -1983,11 +1998,17 @@ const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t
pColInfoData
->
pData
=
taosMemoryMalloc
(
colLen
[
i
]);
}
}
else
{
if
(
pColInfoData
->
nullbitmap
==
NULL
)
{
pColInfoData
->
nullbitmap
=
taosMemoryCalloc
(
1
,
BitmapLen
(
numOfRows
));
}
memcpy
(
pColInfoData
->
nullbitmap
,
pStart
,
BitmapLen
(
numOfRows
));
pStart
+=
BitmapLen
(
numOfRows
);
}
if
(
colLen
[
i
]
>
0
)
{
if
(
pColInfoData
->
pData
==
NULL
)
{
pColInfoData
->
pData
=
taosMemoryCalloc
(
1
,
colLen
[
i
]);
}
memcpy
(
pColInfoData
->
pData
,
pStart
,
colLen
[
i
]);
}
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
88313b90
...
...
@@ -352,7 +352,9 @@ SArray *vmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_DEPLOY
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_RUN
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_DISPATCH
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_DISPATCH_RSP
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_RECOVER
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_RECOVER_RSP
,
vmPutMsgToFetchQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_REPLICA
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_CONFIG
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
88313b90
...
...
@@ -448,7 +448,7 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
}
int32_t
tqProcessTaskDispatchRsp
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
)
{
SStreamDispatchRsp
*
pRsp
=
pMsg
->
pCont
;
SStreamDispatchRsp
*
pRsp
=
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
))
;
int32_t
taskId
=
pRsp
->
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
streamProcessDispatchRsp
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
pRsp
);
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
88313b90
...
...
@@ -737,8 +737,8 @@ static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
static
bool
prepareDataScan
(
SStreamBlockScanInfo
*
pInfo
)
{
SSDataBlock
*
pSDB
=
pInfo
->
pUpdateRes
;
STimeWindow
win
=
{
.
skey
=
INT64_MIN
,
.
ekey
=
INT64_MAX
,
.
skey
=
INT64_MIN
,
.
ekey
=
INT64_MAX
,
};
bool
needRead
=
false
;
if
(
!
isStateWindow
(
pInfo
)
&&
pInfo
->
updateResIndex
<
pSDB
->
info
.
rows
)
{
...
...
@@ -846,7 +846,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
size_t
total
=
taosArrayGetSize
(
pInfo
->
pBlockLists
);
if
(
pInfo
->
blockType
==
STREAM_DATA_TYPE_SSDATA_BLOCK
)
{
if
(
pInfo
->
validBlockIndex
>=
total
)
{
doClearBufferedBlocks
(
pInfo
);
/*doClearBufferedBlocks(pInfo);*/
pOperator
->
status
=
OP_EXEC_DONE
;
return
NULL
;
}
...
...
source/libs/stream/src/stream.c
浏览文件 @
88313b90
...
...
@@ -59,7 +59,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
// rsp by input status
void
*
buf
=
rpcMallocCont
(
sizeof
(
SMsgHead
)
+
sizeof
(
SStreamDispatchRsp
));
((
SMsgHead
*
)
buf
)
->
vgId
=
htonl
(
pReq
->
sourceVg
);
((
SMsgHead
*
)
buf
)
->
vgId
=
htonl
(
pReq
->
upstreamNodeId
);
SStreamDispatchRsp
*
pCont
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
pCont
->
inputStatus
=
status
;
pCont
->
streamId
=
pReq
->
streamId
;
...
...
@@ -78,7 +78,18 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp
// 2.1. idle: exec
// 2.2. executing: return
// 2.3. closing: keep trying
streamExec
(
pTask
,
pMsgCb
);
if
(
pTask
->
execType
!=
TASK_EXEC__NONE
)
{
streamExec
(
pTask
,
pMsgCb
);
}
else
{
ASSERT
(
pTask
->
sinkType
!=
TASK_SINK__NONE
);
while
(
1
)
{
void
*
data
=
streamQueueNextItem
(
pTask
->
inputQueue
);
if
(
data
==
NULL
)
return
0
;
if
(
streamTaskOutput
(
pTask
,
data
)
<
0
)
{
ASSERT
(
0
);
}
}
}
// 3. handle output
// 3.1 check and set status
...
...
source/libs/stream/src/streamDispatch.c
浏览文件 @
88313b90
...
...
@@ -22,6 +22,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
if
(
tEncodeI32
(
pEncoder
,
pReq
->
sourceTaskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
sourceVg
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
sourceChildId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
upstreamNodeId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
blockNum
)
<
0
)
return
-
1
;
ASSERT
(
taosArrayGetSize
(
pReq
->
data
)
==
pReq
->
blockNum
);
ASSERT
(
taosArrayGetSize
(
pReq
->
dataLen
)
==
pReq
->
blockNum
);
...
...
@@ -42,6 +43,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
sourceTaskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
sourceVg
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
sourceChildId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
upstreamNodeId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
blockNum
)
<
0
)
return
-
1
;
ASSERT
(
pReq
->
blockNum
>
0
);
pReq
->
data
=
taosArrayInit
(
pReq
->
blockNum
,
sizeof
(
void
*
));
...
...
@@ -94,6 +96,7 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
.
sourceTaskId
=
pTask
->
taskId
,
.
sourceVg
=
data
->
sourceVg
,
.
sourceChildId
=
pTask
->
childId
,
.
upstreamNodeId
=
pTask
->
nodeId
,
.
blockNum
=
blockNum
,
};
...
...
@@ -184,13 +187,17 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
#endif
SStreamDataBlock
*
pBlock
=
streamQueueNextItem
(
pTask
->
outputQueue
);
if
(
pBlock
==
NULL
)
return
0
;
if
(
pBlock
==
NULL
)
{
atomic_store_8
(
&
pTask
->
outputStatus
,
TASK_OUTPUT_STATUS__NORMAL
);
return
0
;
}
ASSERT
(
pBlock
->
type
==
STREAM_DATA_TYPE_SSDATA_BLOCK
);
SRpcMsg
dispatchMsg
=
{
0
};
SEpSet
*
pEpSet
=
NULL
;
if
(
streamBuildDispatchMsg
(
pTask
,
pBlock
,
&
dispatchMsg
,
&
pEpSet
)
<
0
)
{
ASSERT
(
0
);
atomic_store_8
(
&
pTask
->
outputStatus
,
TASK_OUTPUT_STATUS__NORMAL
);
return
-
1
;
}
...
...
source/libs/stream/src/streamExec.c
浏览文件 @
88313b90
...
...
@@ -65,6 +65,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
}
qRes
->
type
=
STREAM_INPUT__DATA_BLOCK
;
qRes
->
blocks
=
pRes
;
/*qRes->sourceVg = pTask->nodeId;*/
if
(
streamTaskOutput
(
pTask
,
qRes
)
<
0
)
{
streamQueueProcessFail
(
pTask
->
inputQueue
);
taosArrayDestroy
(
pRes
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录