diff --git a/examples/c/stream_demo.c b/examples/c/stream_demo.c
index 97ff2886fcb95fcfaca19ba4baf70b64160855ca..943fcbdb5329c4c53e9f4c663497419091f8b4d2 100644
--- a/examples/c/stream_demo.c
+++ b/examples/c/stream_demo.c
@@ -25,7 +25,7 @@ int32_t init_env() {
return -1;
}
- TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
+ TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1;
@@ -82,7 +82,7 @@ int32_t create_stream() {
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes = taos_query(
- pConn, "create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from tu1 interval(10m)");
+ pConn, "create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from st1 interval(10m)");
if (taos_errno(pRes) != 0) {
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
return -1;
diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index 82674c61157b2e921530a3a6be09d696d8f1d65c..db8d3ac033636adba4d4807897adea5b1e4bd1d5 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -398,6 +398,8 @@ typedef struct {
int8_t inputStatus;
} SStreamTaskRecoverRsp;
+int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
+
int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb);
int32_t streamTaskRun(SStreamTask* pTask);
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index 3de5109a1ad1d181003b0134c32fc32bfa6e9e93..2b5e18c1dbe71d9fdd5288f3db1d745926a49115 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -421,10 +421,20 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
}
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
- SStreamDispatchReq* pReq = pMsg->pCont;
- int32_t taskId = pReq->taskId;
- SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
- streamProcessDispatchReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg);
+ char* msgStr = pMsg->pCont;
+ char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
+ int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
+ SStreamDispatchReq req;
+ SDecoder decoder;
+ tDecoderInit(&decoder, msgBody, msgLen);
+ tDecodeStreamDispatchReq(&decoder, &req);
+ int32_t taskId = req.taskId;
+ SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
+ SRpcMsg rsp = {
+ .info = pMsg->info,
+ .code = 0,
+ };
+ streamProcessDispatchReq(pTask, &pTq->pVnode->msgCb, &req, &rsp);
return 0;
}
diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c
index d89f2ed57d5289997b67387ec7aafd761cb1fd8d..99c61e1479888b43f8e1d650c4309e6ee8c6f2be 100644
--- a/source/libs/stream/src/stream.c
+++ b/source/libs/stream/src/stream.c
@@ -57,12 +57,14 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
}
// rsp by input status
- SStreamDispatchRsp* pCont = rpcMallocCont(sizeof(SStreamDispatchRsp));
+ void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
+ ((SMsgHead*)buf)->vgId = htonl(pReq->sourceVg);
+ SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
pCont->inputStatus = status;
pCont->streamId = pReq->streamId;
pCont->taskId = pReq->sourceTaskId;
- pRsp->pCont = pCont;
- pRsp->contLen = sizeof(SStreamDispatchRsp);
+ pRsp->pCont = buf;
+ pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp);
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
}
@@ -87,8 +89,12 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp
}
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp) {
+ ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
+ int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
+ ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
// TODO: init recover timer
+ return 0;
}
// continue dispatch
streamSink1(pTask, pMsgCb);
diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c
index 4b88cf503e19976168a689f2f921d5bb6ea33564..72df516e0d63222e6ebab82a086b9116e7c62432 100644
--- a/source/libs/stream/src/streamExec.c
+++ b/source/libs/stream/src/streamExec.c
@@ -43,6 +43,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
if (output == NULL) break;
// TODO: do we need free memory?
SSDataBlock* outputCopy = createOneDataBlock(output, true);
+ outputCopy->info.childId = pTask->childId;
taosArrayPush(pRes, outputCopy);
}
return 0;
diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c
index 9f22bbbe8af3ccd78bdb1fd113201906808917c7..769d672042eea6afbb26945fcfdc1af1cb045836 100644
--- a/source/libs/stream/src/streamMsg.c
+++ b/source/libs/stream/src/streamMsg.c
@@ -13,7 +13,7 @@
* along with this program. If not, see .
*/
-#include "tstream.h"
+#include "streamInc.h"
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
@@ -147,13 +147,13 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
int32_t tlen;
tEncodeSize(tEncodeStreamDispatchReq, &req, tlen, code);
if (code < 0) goto FAIL;
+ code = -1;
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
- code = -1;
goto FAIL;
}
- ((SMsgHead*)buf)->vgId = htonl(pTask->fixedEpDispatcher.nodeId);
+ ((SMsgHead*)buf)->vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
@@ -165,16 +165,24 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
pMsg->contLen = tlen + sizeof(SMsgHead);
pMsg->pCont = buf;
+ pMsg->msgType = pTask->dispatchMsgType;
code = 0;
FAIL:
- if (buf) taosMemoryFree(buf);
+ if (code < 0 && buf) rpcFreeCont(buf);
if (req.data) taosArrayDestroyP(req.data, (FDelete)taosMemoryFree);
if (req.dataLen) taosArrayDestroy(req.dataLen);
return code;
}
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data) {
+#if 0
+ int8_t old =
+ atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
+ if (old != TASK_OUTPUT_STATUS__NORMAL) {
+ return 0;
+ }
+#endif
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
SRpcMsg dispatchMsg = {0};
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, NULL) < 0) {
@@ -201,12 +209,19 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* dat
tmsgSendReq(pEpSet, &dispatchMsg);
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
- // TODO
- ASSERT(0);
+ SRpcMsg dispatchMsg = {0};
+ SEpSet* pEpSet = NULL;
+ if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) {
+ ASSERT(0);
+ return -1;
+ }
+
+ tmsgSendReq(pEpSet, &dispatchMsg);
}
return 0;
}
+#if 0
static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
SStreamTaskExecReq req = {
.streamId = pTask->streamId,
@@ -287,3 +302,4 @@ static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashOb
}
return 0;
}
+#endif
diff --git a/source/libs/stream/src/streamSink.c b/source/libs/stream/src/streamSink.c
index 6fd0a0051757d496c74fad8e390021137196a560..35bebe0e63e15aabd0d74a8bacdc9a134037ce78 100644
--- a/source/libs/stream/src/streamSink.c
+++ b/source/libs/stream/src/streamSink.c
@@ -13,8 +13,7 @@
* along with this program. If not, see .
*/
-#include "executor.h"
-#include "tstream.h"
+#include "streamInc.h"
int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) {
SStreamQueue* queue;
@@ -23,12 +22,13 @@ int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) {
} else {
queue = pTask->outputQueue;
}
+
/*if (streamDequeueBegin(queue) == true) {*/
/*return -1;*/
/*}*/
- if (pTask->sinkType == TASK_SINK__TABLE || pTask->sinkType == TASK_SINK__SMA) {
- ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
+ if (pTask->sinkType == TASK_SINK__TABLE || pTask->sinkType == TASK_SINK__SMA ||
+ pTask->dispatchType != TASK_DISPATCH__NONE) {
while (1) {
SStreamDataBlock* pBlock = streamQueueNextItem(queue);
if (pBlock == NULL) break;
@@ -36,13 +36,18 @@ int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) {
// local sink
if (pTask->sinkType == TASK_SINK__TABLE) {
+ ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
} else if (pTask->sinkType == TASK_SINK__SMA) {
+ ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pBlock->blocks);
}
+ // TODO: sink and dispatch should be only one
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
ASSERT(queue == pTask->outputQueue);
+ ASSERT(pTask->sinkType == TASK_SINK__NONE);
+
streamDispatch(pTask, pMsgCb, pBlock);
}