diff --git a/include/common/tmsg.h b/include/common/tmsg.h
index ae21986c56fb30f07438d33dd93dfecc0d88503e..72ec01798037eb85e06c9cb48180f2990f403ed2 100644
--- a/include/common/tmsg.h
+++ b/include/common/tmsg.h
@@ -2574,6 +2574,14 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) {
taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp);
}
+typedef struct {
+ void* data;
+} SStreamDispatchReq;
+
+typedef struct {
+ int8_t status;
+} SStreamDispatchRsp;
+
#define TD_AUTO_CREATE_TABLE 0x1
typedef struct {
int64_t suid;
diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index 56e6a39ce8e45af9894e47144afd327cbae28885..796fee4f89c231b11ab3c8fa20bcd55908c950e1 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -13,6 +13,7 @@
* along with this program. If not, see .
*/
+#include "os.h"
#include "tdatablock.h"
#include "tmsg.h"
#include "tmsgcb.h"
@@ -29,8 +30,22 @@ extern "C" {
typedef struct SStreamTask SStreamTask;
enum {
- STREAM_TASK_STATUS__RUNNING = 1,
- STREAM_TASK_STATUS__STOP,
+ TASK_STATUS__IDLE = 1,
+ TASK_STATUS__EXECUTING,
+ TASK_STATUS__CLOSING,
+};
+
+enum {
+ TASK_INPUT_STATUS__NORMAL = 1,
+ TASK_INPUT_STATUS__BLOCKED,
+ TASK_INPUT_STATUS__RECOVER,
+ TASK_INPUT_STATUS__STOP,
+};
+
+enum {
+ TASK_OUTPUT_STATUS__NORMAL = 1,
+ TASK_OUTPUT_STATUS__WAIT,
+ TASK_OUTPUT_STATUS__BLOCKED,
};
enum {
@@ -38,10 +53,64 @@ enum {
STREAM_CREATED_BY__SMA,
};
+enum {
+ STREAM_INPUT__DATA_SUBMIT = 1,
+ STREAM_INPUT__DATA_BLOCK,
+ STREAM_INPUT__CHECKPOINT,
+};
+
typedef struct {
- int32_t nodeId; // 0 for snode
- SEpSet epSet;
-} SStreamTaskEp;
+ int8_t type;
+
+ int32_t sourceVg;
+ int64_t sourceVer;
+
+ int32_t* dataRef;
+ SSubmitReq* data;
+} SStreamDataSubmit;
+
+typedef struct {
+ int8_t type;
+
+ int32_t sourceVg;
+ int64_t sourceVer;
+
+ SArray* blocks; // SArray
+} SStreamDataBlock;
+
+typedef struct {
+ int8_t type;
+} SStreamCheckpoint;
+
+static FORCE_INLINE SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) {
+ SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosMemoryCalloc(1, sizeof(SStreamDataSubmit));
+ if (pDataSubmit == NULL) return NULL;
+ pDataSubmit->data = pReq;
+ pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
+ if (pDataSubmit->data == NULL) goto FAIL;
+ *pDataSubmit->dataRef = 1;
+ return pDataSubmit;
+FAIL:
+ taosMemoryFree(pDataSubmit);
+ return NULL;
+}
+
+static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) {
+ //
+ atomic_add_fetch_32(pDataSubmit->dataRef, 1);
+}
+
+static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) {
+ int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
+ ASSERT(ref >= 0);
+ if (ref == 0) {
+ taosMemoryFree(pDataSubmit->data);
+ taosMemoryFree(pDataSubmit->dataRef);
+ }
+}
+
+int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput);
+void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput);
typedef struct {
void* inputHandle;
@@ -122,9 +191,15 @@ enum {
TASK_SINK__FETCH,
};
+enum {
+ TASK_INPUT_TYPE__SUMBIT_BLOCK = 1,
+ TASK_INPUT_TYPE__DATA_BLOCK,
+};
+
struct SStreamTask {
int64_t streamId;
int32_t taskId;
+ int8_t inputType;
int8_t status;
int8_t sourceType;
@@ -155,9 +230,11 @@ struct SStreamTask {
STaskDispatcherShuffle shuffleDispatcher;
};
- // msg buffer
- int32_t memUsed;
+ int8_t inputStatus;
+ int8_t outputStatus;
+
STaosQueue* inputQ;
+ STaosQueue* outputQ;
// application storage
void* ahandle;
@@ -199,10 +276,16 @@ typedef struct {
SArray* res; // SArray
} SStreamSinkReq;
-int32_t streamEnqueueData(SStreamTask* pTask, const void* input, int32_t inputType);
+int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input);
+int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input);
+int32_t streamDequeueOutput(SStreamTask* pTask, void** output);
int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId);
+int32_t streamTaskExecNew(SStreamTask* pTask);
+
+int32_t streamTaskHandleInput(SStreamTask* pTask, void* data);
+
#ifdef __cplusplus
}
#endif
diff --git a/include/os/osAtomic.h b/include/os/osAtomic.h
index e2a122a0fe4f220f00576941906e74b1e8036339..8600992d68050d3ba80767cf687c6b21b4a23c77 100644
--- a/include/os/osAtomic.h
+++ b/include/os/osAtomic.h
@@ -23,92 +23,92 @@ extern "C" {
// If the error is in a third-party library, place this header file under the third-party library header file.
// When you want to use this feature, you should find or add the same function in the following section.
#ifndef ALLOW_FORBID_FUNC
- #define __atomic_load_n __ATOMIC_LOAD_N_FUNC_TAOS_FORBID
- #define __atomic_store_n __ATOMIC_STORE_N_FUNC_TAOS_FORBID
- #define __atomic_exchange_n __ATOMIC_EXCHANGE_N_FUNC_TAOS_FORBID
- #define __sync_val_compare_and_swap __SYNC_VAL_COMPARE_AND_SWAP_FUNC_TAOS_FORBID
- #define __atomic_add_fetch __ATOMIC_ADD_FETCH_FUNC_TAOS_FORBID
- #define __atomic_fetch_add __ATOMIC_FETCH_ADD_FUNC_TAOS_FORBID
- #define __atomic_sub_fetch __ATOMIC_SUB_FETCH_FUNC_TAOS_FORBID
- #define __atomic_fetch_sub __ATOMIC_FETCH_SUB_FUNC_TAOS_FORBID
- #define __atomic_and_fetch __ATOMIC_AND_FETCH_FUNC_TAOS_FORBID
- #define __atomic_fetch_and __ATOMIC_FETCH_AND_FUNC_TAOS_FORBID
- #define __atomic_or_fetch __ATOMIC_OR_FETCH_FUNC_TAOS_FORBID
- #define __atomic_fetch_or __ATOMIC_FETCH_OR_FUNC_TAOS_FORBID
- #define __atomic_xor_fetch __ATOMIC_XOR_FETCH_FUNC_TAOS_FORBID
- #define __atomic_fetch_xor __ATOMIC_FETCH_XOR_FUNC_TAOS_FORBID
+#define __atomic_load_n __ATOMIC_LOAD_N_FUNC_TAOS_FORBID
+#define __atomic_store_n __ATOMIC_STORE_N_FUNC_TAOS_FORBID
+#define __atomic_exchange_n __ATOMIC_EXCHANGE_N_FUNC_TAOS_FORBID
+#define __sync_val_compare_and_swap __SYNC_VAL_COMPARE_AND_SWAP_FUNC_TAOS_FORBID
+#define __atomic_add_fetch __ATOMIC_ADD_FETCH_FUNC_TAOS_FORBID
+#define __atomic_fetch_add __ATOMIC_FETCH_ADD_FUNC_TAOS_FORBID
+#define __atomic_sub_fetch __ATOMIC_SUB_FETCH_FUNC_TAOS_FORBID
+#define __atomic_fetch_sub __ATOMIC_FETCH_SUB_FUNC_TAOS_FORBID
+#define __atomic_and_fetch __ATOMIC_AND_FETCH_FUNC_TAOS_FORBID
+#define __atomic_fetch_and __ATOMIC_FETCH_AND_FUNC_TAOS_FORBID
+#define __atomic_or_fetch __ATOMIC_OR_FETCH_FUNC_TAOS_FORBID
+#define __atomic_fetch_or __ATOMIC_FETCH_OR_FUNC_TAOS_FORBID
+#define __atomic_xor_fetch __ATOMIC_XOR_FETCH_FUNC_TAOS_FORBID
+#define __atomic_fetch_xor __ATOMIC_FETCH_XOR_FUNC_TAOS_FORBID
#endif
-int8_t atomic_load_8(int8_t volatile *ptr);
+int8_t atomic_load_8(int8_t volatile *ptr);
int16_t atomic_load_16(int16_t volatile *ptr);
int32_t atomic_load_32(int32_t volatile *ptr);
int64_t atomic_load_64(int64_t volatile *ptr);
-void* atomic_load_ptr(void *ptr);
-void atomic_store_8(int8_t volatile *ptr, int8_t val);
-void atomic_store_16(int16_t volatile *ptr, int16_t val);
-void atomic_store_32(int32_t volatile *ptr, int32_t val);
-void atomic_store_64(int64_t volatile *ptr, int64_t val);
-void atomic_store_ptr(void *ptr, void *val);
-int8_t atomic_exchange_8(int8_t volatile *ptr, int8_t val);
+void *atomic_load_ptr(void *ptr);
+void atomic_store_8(int8_t volatile *ptr, int8_t val);
+void atomic_store_16(int16_t volatile *ptr, int16_t val);
+void atomic_store_32(int32_t volatile *ptr, int32_t val);
+void atomic_store_64(int64_t volatile *ptr, int64_t val);
+void atomic_store_ptr(void *ptr, void *val);
+int8_t atomic_exchange_8(int8_t volatile *ptr, int8_t val);
int16_t atomic_exchange_16(int16_t volatile *ptr, int16_t val);
int32_t atomic_exchange_32(int32_t volatile *ptr, int32_t val);
int64_t atomic_exchange_64(int64_t volatile *ptr, int64_t val);
-void* atomic_exchange_ptr(void *ptr, void *val);
-int8_t atomic_val_compare_exchange_8(int8_t volatile *ptr, int8_t oldval, int8_t newval);
+void *atomic_exchange_ptr(void *ptr, void *val);
+int8_t atomic_val_compare_exchange_8(int8_t volatile *ptr, int8_t oldval, int8_t newval);
int16_t atomic_val_compare_exchange_16(int16_t volatile *ptr, int16_t oldval, int16_t newval);
int32_t atomic_val_compare_exchange_32(int32_t volatile *ptr, int32_t oldval, int32_t newval);
int64_t atomic_val_compare_exchange_64(int64_t volatile *ptr, int64_t oldval, int64_t newval);
-void* atomic_val_compare_exchange_ptr(void *ptr, void *oldval, void *newval);
-int8_t atomic_add_fetch_8(int8_t volatile *ptr, int8_t val);
+void *atomic_val_compare_exchange_ptr(void *ptr, void *oldval, void *newval);
+int8_t atomic_add_fetch_8(int8_t volatile *ptr, int8_t val);
int16_t atomic_add_fetch_16(int16_t volatile *ptr, int16_t val);
int32_t atomic_add_fetch_32(int32_t volatile *ptr, int32_t val);
int64_t atomic_add_fetch_64(int64_t volatile *ptr, int64_t val);
-void* atomic_add_fetch_ptr(void *ptr, void *val);
-int8_t atomic_fetch_add_8(int8_t volatile *ptr, int8_t val);
+void *atomic_add_fetch_ptr(void *ptr, void *val);
+int8_t atomic_fetch_add_8(int8_t volatile *ptr, int8_t val);
int16_t atomic_fetch_add_16(int16_t volatile *ptr, int16_t val);
int32_t atomic_fetch_add_32(int32_t volatile *ptr, int32_t val);
int64_t atomic_fetch_add_64(int64_t volatile *ptr, int64_t val);
-void* atomic_fetch_add_ptr(void *ptr, void *val);
-int8_t atomic_sub_fetch_8(int8_t volatile *ptr, int8_t val);
+void *atomic_fetch_add_ptr(void *ptr, void *val);
+int8_t atomic_sub_fetch_8(int8_t volatile *ptr, int8_t val);
int16_t atomic_sub_fetch_16(int16_t volatile *ptr, int16_t val);
int32_t atomic_sub_fetch_32(int32_t volatile *ptr, int32_t val);
int64_t atomic_sub_fetch_64(int64_t volatile *ptr, int64_t val);
-void* atomic_sub_fetch_ptr(void *ptr, void *val);
-int8_t atomic_fetch_sub_8(int8_t volatile *ptr, int8_t val);
+void *atomic_sub_fetch_ptr(void *ptr, void *val);
+int8_t atomic_fetch_sub_8(int8_t volatile *ptr, int8_t val);
int16_t atomic_fetch_sub_16(int16_t volatile *ptr, int16_t val);
int32_t atomic_fetch_sub_32(int32_t volatile *ptr, int32_t val);
int64_t atomic_fetch_sub_64(int64_t volatile *ptr, int64_t val);
-void* atomic_fetch_sub_ptr(void *ptr, void *val);
-int8_t atomic_and_fetch_8(int8_t volatile *ptr, int8_t val);
+void *atomic_fetch_sub_ptr(void *ptr, void *val);
+int8_t atomic_and_fetch_8(int8_t volatile *ptr, int8_t val);
int16_t atomic_and_fetch_16(int16_t volatile *ptr, int16_t val);
int32_t atomic_and_fetch_32(int32_t volatile *ptr, int32_t val);
int64_t atomic_and_fetch_64(int64_t volatile *ptr, int64_t val);
-void* atomic_and_fetch_ptr(void *ptr, void *val);
-int8_t atomic_fetch_and_8(int8_t volatile *ptr, int8_t val);
+void *atomic_and_fetch_ptr(void *ptr, void *val);
+int8_t atomic_fetch_and_8(int8_t volatile *ptr, int8_t val);
int16_t atomic_fetch_and_16(int16_t volatile *ptr, int16_t val);
int32_t atomic_fetch_and_32(int32_t volatile *ptr, int32_t val);
int64_t atomic_fetch_and_64(int64_t volatile *ptr, int64_t val);
-void* atomic_fetch_and_ptr(void *ptr, void *val);
-int8_t atomic_or_fetch_8(int8_t volatile *ptr, int8_t val);
+void *atomic_fetch_and_ptr(void *ptr, void *val);
+int8_t atomic_or_fetch_8(int8_t volatile *ptr, int8_t val);
int16_t atomic_or_fetch_16(int16_t volatile *ptr, int16_t val);
int32_t atomic_or_fetch_32(int32_t volatile *ptr, int32_t val);
int64_t atomic_or_fetch_64(int64_t volatile *ptr, int64_t val);
-void* atomic_or_fetch_ptr(void *ptr, void *val);
-int8_t atomic_fetch_or_8(int8_t volatile *ptr, int8_t val);
+void *atomic_or_fetch_ptr(void *ptr, void *val);
+int8_t atomic_fetch_or_8(int8_t volatile *ptr, int8_t val);
int16_t atomic_fetch_or_16(int16_t volatile *ptr, int16_t val);
int32_t atomic_fetch_or_32(int32_t volatile *ptr, int32_t val);
int64_t atomic_fetch_or_64(int64_t volatile *ptr, int64_t val);
-void* atomic_fetch_or_ptr(void *ptr, void *val);
-int8_t atomic_xor_fetch_8(int8_t volatile *ptr, int8_t val);
+void *atomic_fetch_or_ptr(void *ptr, void *val);
+int8_t atomic_xor_fetch_8(int8_t volatile *ptr, int8_t val);
int16_t atomic_xor_fetch_16(int16_t volatile *ptr, int16_t val);
int32_t atomic_xor_fetch_32(int32_t volatile *ptr, int32_t val);
int64_t atomic_xor_fetch_64(int64_t volatile *ptr, int64_t val);
-void* atomic_xor_fetch_ptr(void *ptr, void *val);
-int8_t atomic_fetch_xor_8(int8_t volatile *ptr, int8_t val);
+void *atomic_xor_fetch_ptr(void *ptr, void *val);
+int8_t atomic_fetch_xor_8(int8_t volatile *ptr, int8_t val);
int16_t atomic_fetch_xor_16(int16_t volatile *ptr, int16_t val);
int32_t atomic_fetch_xor_32(int32_t volatile *ptr, int32_t val);
int64_t atomic_fetch_xor_64(int64_t volatile *ptr, int64_t val);
-void* atomic_fetch_xor_ptr(void *ptr, void *val);
+void *atomic_fetch_xor_ptr(void *ptr, void *val);
#ifdef __cplusplus
}
diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c
index 8225eca6596918c784d4d98ea49eca291b0c538d..35ba25acd54abf39351e51cfea42152f41b57b9e 100644
--- a/source/dnode/mnode/impl/src/mndDef.c
+++ b/source/dnode/mnode/impl/src/mndDef.c
@@ -17,6 +17,147 @@
#include "mndDef.h"
#include "mndConsumer.h"
+int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
+ int32_t sz = 0;
+ /*int32_t outputNameSz = 0;*/
+ if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
+ if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
+ if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
+ if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
+ if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
+ if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
+ if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
+ if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
+ if (tEncodeI64(pEncoder, pObj->dbUid) < 0) return -1;
+ if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
+ if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
+ if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1;
+ if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
+ if (tEncodeI32(pEncoder, pObj->triggerParam) < 0) return -1;
+ if (tEncodeI64(pEncoder, pObj->waterMark) < 0) return -1;
+ if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
+ if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
+ if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
+ /*if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;*/
+ if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
+ // TODO encode tasks
+ if (pObj->tasks) {
+ sz = taosArrayGetSize(pObj->tasks);
+ }
+ if (tEncodeI32(pEncoder, sz) < 0) return -1;
+
+ for (int32_t i = 0; i < sz; i++) {
+ SArray *pArray = taosArrayGetP(pObj->tasks, i);
+ int32_t innerSz = taosArrayGetSize(pArray);
+ if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
+ for (int32_t j = 0; j < innerSz; j++) {
+ SStreamTask *pTask = taosArrayGetP(pArray, j);
+ if (tEncodeSStreamTask(pEncoder, pTask) < 0) return -1;
+ }
+ }
+
+ if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
+
+#if 0
+ if (pObj->ColAlias != NULL) {
+ outputNameSz = taosArrayGetSize(pObj->ColAlias);
+ }
+ if (tEncodeI32(pEncoder, outputNameSz) < 0) return -1;
+ for (int32_t i = 0; i < outputNameSz; i++) {
+ char *name = taosArrayGetP(pObj->ColAlias, i);
+ if (tEncodeCStr(pEncoder, name) < 0) return -1;
+ }
+#endif
+ return pEncoder->pos;
+}
+
+int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
+ if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
+ if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
+ if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
+ if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
+ if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
+ if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
+ if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
+ if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
+ if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1;
+ if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
+ if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
+ if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1;
+ if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
+ if (tDecodeI32(pDecoder, &pObj->triggerParam) < 0) return -1;
+ if (tDecodeI64(pDecoder, &pObj->waterMark) < 0) return -1;
+ if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1;
+ if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
+ if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
+ /*if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;*/
+ if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
+ pObj->tasks = NULL;
+ int32_t sz;
+ if (tDecodeI32(pDecoder, &sz) < 0) return -1;
+ if (sz != 0) {
+ pObj->tasks = taosArrayInit(sz, sizeof(void *));
+ for (int32_t i = 0; i < sz; i++) {
+ int32_t innerSz;
+ if (tDecodeI32(pDecoder, &innerSz) < 0) return -1;
+ SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
+ for (int32_t j = 0; j < innerSz; j++) {
+ SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
+ if (pTask == NULL) return -1;
+ if (tDecodeSStreamTask(pDecoder, pTask) < 0) return -1;
+ taosArrayPush(pArray, &pTask);
+ }
+ taosArrayPush(pObj->tasks, &pArray);
+ }
+ }
+
+ if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
+#if 0
+ int32_t outputNameSz;
+ if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1;
+ if (outputNameSz != 0) {
+ pObj->ColAlias = taosArrayInit(outputNameSz, sizeof(void *));
+ if (pObj->ColAlias == NULL) {
+ return -1;
+ }
+ }
+ for (int32_t i = 0; i < outputNameSz; i++) {
+ char *name;
+ if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1;
+ taosArrayPush(pObj->ColAlias, &name);
+ }
+#endif
+ return 0;
+}
+
+SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
+ SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
+ if (pVgEpNew == NULL) return NULL;
+ pVgEpNew->vgId = pVgEp->vgId;
+ pVgEpNew->qmsg = strdup(pVgEp->qmsg);
+ pVgEpNew->epSet = pVgEp->epSet;
+ return pVgEpNew;
+}
+
+void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
+ if (pVgEp->qmsg) taosMemoryFree(pVgEp->qmsg);
+}
+
+int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
+ int32_t tlen = 0;
+ tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
+ tlen += taosEncodeString(buf, pVgEp->qmsg);
+ tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
+ return tlen;
+}
+
+void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) {
+ buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
+ buf = taosDecodeString(buf, &pVgEp->qmsg);
+ buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
+ return (void *)buf;
+}
+
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]) {
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
if (pConsumer == NULL) {
@@ -187,34 +328,6 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) {
return (void *)buf;
}
-SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
- SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
- if (pVgEpNew == NULL) return NULL;
- pVgEpNew->vgId = pVgEp->vgId;
- pVgEpNew->qmsg = strdup(pVgEp->qmsg);
- pVgEpNew->epSet = pVgEp->epSet;
- return pVgEpNew;
-}
-
-void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
- if (pVgEp->qmsg) taosMemoryFree(pVgEp->qmsg);
-}
-
-int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
- int32_t tlen = 0;
- tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
- tlen += taosEncodeString(buf, pVgEp->qmsg);
- tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
- return tlen;
-}
-
-void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) {
- buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
- buf = taosDecodeString(buf, &pVgEp->qmsg);
- buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
- return (void *)buf;
-}
-
SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
if (pConsumerEpNew == NULL) return NULL;
@@ -413,119 +526,6 @@ void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
return (void *)buf;
}
-int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
- int32_t sz = 0;
- /*int32_t outputNameSz = 0;*/
- if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
- if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
- if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
- if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
- if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
- if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
- if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
- if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
- if (tEncodeI64(pEncoder, pObj->dbUid) < 0) return -1;
- if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
- if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
- if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1;
- if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
- if (tEncodeI32(pEncoder, pObj->triggerParam) < 0) return -1;
- if (tEncodeI64(pEncoder, pObj->waterMark) < 0) return -1;
- if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
- if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
- if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
- /*if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;*/
- if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
- // TODO encode tasks
- if (pObj->tasks) {
- sz = taosArrayGetSize(pObj->tasks);
- }
- if (tEncodeI32(pEncoder, sz) < 0) return -1;
-
- for (int32_t i = 0; i < sz; i++) {
- SArray *pArray = taosArrayGetP(pObj->tasks, i);
- int32_t innerSz = taosArrayGetSize(pArray);
- if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
- for (int32_t j = 0; j < innerSz; j++) {
- SStreamTask *pTask = taosArrayGetP(pArray, j);
- if (tEncodeSStreamTask(pEncoder, pTask) < 0) return -1;
- }
- }
-
- if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
-
-#if 0
- if (pObj->ColAlias != NULL) {
- outputNameSz = taosArrayGetSize(pObj->ColAlias);
- }
- if (tEncodeI32(pEncoder, outputNameSz) < 0) return -1;
- for (int32_t i = 0; i < outputNameSz; i++) {
- char *name = taosArrayGetP(pObj->ColAlias, i);
- if (tEncodeCStr(pEncoder, name) < 0) return -1;
- }
-#endif
- return pEncoder->pos;
-}
-
-int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
- if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
- if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
- if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
- if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
- if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
- if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
- if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
- if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
- if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1;
- if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
- if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
- if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1;
- if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
- if (tDecodeI32(pDecoder, &pObj->triggerParam) < 0) return -1;
- if (tDecodeI64(pDecoder, &pObj->waterMark) < 0) return -1;
- if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1;
- if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
- if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
- /*if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;*/
- if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
- pObj->tasks = NULL;
- int32_t sz;
- if (tDecodeI32(pDecoder, &sz) < 0) return -1;
- if (sz != 0) {
- pObj->tasks = taosArrayInit(sz, sizeof(void *));
- for (int32_t i = 0; i < sz; i++) {
- int32_t innerSz;
- if (tDecodeI32(pDecoder, &innerSz) < 0) return -1;
- SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
- for (int32_t j = 0; j < innerSz; j++) {
- SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
- if (pTask == NULL) return -1;
- if (tDecodeSStreamTask(pDecoder, pTask) < 0) return -1;
- taosArrayPush(pArray, &pTask);
- }
- taosArrayPush(pObj->tasks, &pArray);
- }
- }
-
- if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
-#if 0
- int32_t outputNameSz;
- if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1;
- if (outputNameSz != 0) {
- pObj->ColAlias = taosArrayInit(outputNameSz, sizeof(void *));
- if (pObj->ColAlias == NULL) {
- return -1;
- }
- }
- for (int32_t i = 0; i < outputNameSz; i++) {
- char *name;
- if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1;
- taosArrayPush(pObj->ColAlias, &name);
- }
-#endif
- return 0;
-}
-
int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pOffset->key);
diff --git a/source/dnode/mnode/impl/test/trans/trans2.cpp b/source/dnode/mnode/impl/test/trans/trans2.cpp
index 974c86b4231b03b53821bd1aece9a40c18200eec..061ede345d15db174e44000458648b7a62ef1d31 100644
--- a/source/dnode/mnode/impl/test/trans/trans2.cpp
+++ b/source/dnode/mnode/impl/test/trans/trans2.cpp
@@ -510,4 +510,4 @@ TEST_F(MndTestTrans2, 04_Conflict) {
ASSERT_EQ(pUser, nullptr);
mndReleaseUser(pMnode, pUser);
}
-}
\ No newline at end of file
+}
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index 952645190760826528e4e31d96c88cd8638eb7cf..ac8a271a4f3faa8a6e467e5f0c837fe7619db844 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -1031,3 +1031,37 @@ int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId)
}
return 0;
}
+
+int32_t tqProcessTaskExec2(STQ* pTq, char* msg, int32_t msgLen) {
+ SStreamTaskExecReq req = {0};
+ tDecodeSStreamTaskExecReq(msg, &req);
+ int32_t taskId = req.taskId;
+
+ SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
+ ASSERT(pTask);
+ ASSERT(pTask->inputType == TASK_INPUT_TYPE__DATA_BLOCK);
+
+ // enqueue
+ int32_t inputStatus = streamEnqueueDataBlk(pTask, (SStreamDataBlock*)req.data);
+ if (inputStatus == TASK_INPUT_STATUS__BLOCKED) {
+ // TODO rsp blocked
+ return 0;
+ }
+
+ // try exec
+ int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
+ if (execStatus == TASK_STATUS__IDLE) {
+ if (streamTaskExecNew(pTask) < 0) {
+ atomic_store_8(&pTask->status, TASK_STATUS__CLOSING);
+
+ goto FAIL;
+ }
+ } else if (execStatus == TASK_STATUS__EXECUTING) {
+ return 0;
+ }
+
+ // TODO rsp success
+ return 0;
+FAIL:
+ return -1;
+}
diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c
index 08093c8b184a47fa5e65d77ce4f5a56fe5d86ff6..33147a0c0aeb2eb65c00b72955e61f4d4c06a6fb 100644
--- a/source/libs/stream/src/tstream.c
+++ b/source/libs/stream/src/tstream.c
@@ -16,6 +16,25 @@
#include "tstream.h"
#include "executor.h"
+int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput) {
+ int32_t tlen = 0;
+ tlen += taosEncodeFixedI8(buf, pOutput->type);
+ tlen += taosEncodeFixedI32(buf, pOutput->sourceVg);
+ tlen += taosEncodeFixedI64(buf, pOutput->sourceVer);
+ ASSERT(pOutput->type == STREAM_INPUT__DATA_BLOCK);
+ tlen += tEncodeDataBlocks(buf, pOutput->blocks);
+ return tlen;
+}
+
+void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput) {
+ buf = taosDecodeFixedI8(buf, &pInput->type);
+ buf = taosDecodeFixedI32(buf, &pInput->sourceVg);
+ buf = taosDecodeFixedI64(buf, &pInput->sourceVer);
+ ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
+ buf = tDecodeDataBlocks(buf, &pInput->blocks);
+ return (void*)buf;
+}
+
static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
SStreamTaskExecReq req = {
.streamId = pTask->streamId,
@@ -97,6 +116,226 @@ static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashOb
return 0;
}
+int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input) {
+ ASSERT(pTask->inputType == TASK_INPUT_TYPE__SUMBIT_BLOCK);
+ int8_t inputStatus = atomic_load_8(&pTask->inputStatus);
+ if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
+ streamDataSubmitRefInc(input);
+ taosWriteQitem(pTask->inputQ, input);
+ }
+ return inputStatus;
+}
+
+int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input) {
+ ASSERT(pTask->inputType == TASK_INPUT_TYPE__DATA_BLOCK);
+ taosWriteQitem(pTask->inputQ, input);
+ int8_t inputStatus = atomic_load_8(&pTask->inputStatus);
+ return inputStatus;
+}
+
+int32_t streamTaskProcessTriggerReq(SStreamTask* pTask, SMsgCb* pMsgCb, char* msg, int32_t msgLen) {
+ //
+ return 0;
+}
+
+int32_t streamTaskProcessInputReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* pBlock, SRpcMsg* pRsp) {
+ // 1. handle input
+ // 1.1 enqueue
+ taosWriteQitem(pTask->inputQ, pBlock);
+ // 1.2 calc back pressure
+ // 1.3 rsp by input status
+ int8_t inputStatus = atomic_load_8(&pTask->inputStatus);
+ SStreamDispatchRsp* pCont = rpcMallocCont(sizeof(SStreamDispatchRsp));
+ pCont->status = inputStatus;
+ pRsp->pCont = pCont;
+ pRsp->contLen = sizeof(SStreamDispatchRsp);
+ tmsgSendRsp(pRsp);
+ // 2. try exec
+ // 2.1. idle: exec
+ // 2.2. executing: return
+ // 2.3. closing: keep trying
+ while (1) {
+ int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
+ void* exec = pTask->exec.runners[0].executor;
+ if (execStatus == TASK_STATUS__IDLE) {
+ SArray* pRes = taosArrayInit(0, sizeof(void*));
+ const SArray* blocks = pBlock->blocks;
+ qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK);
+ while (1) {
+ SSDataBlock* output;
+ uint64_t ts = 0;
+ if (qExecTask(exec, &output, &ts) < 0) {
+ ASSERT(false);
+ }
+ if (output == NULL) break;
+ taosArrayPush(pRes, &output);
+ }
+ // TODO: wrap destroy block
+ taosArrayDestroyP(pBlock->blocks, (FDelete)blockDataDestroy);
+
+ if (taosArrayGetSize(pRes) != 0) {
+ SArray** resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM);
+ *resQ = pRes;
+ taosWriteQitem(pTask->outputQ, resQ);
+ }
+
+ } else if (execStatus == TASK_STATUS__CLOSING) {
+ continue;
+ } else if (execStatus == TASK_STATUS__EXECUTING)
+ break;
+ else {
+ ASSERT(0);
+ }
+ }
+ // 3. handle output
+ // 3.1 check and set status
+ // 3.2 dispatch / sink
+ STaosQall* qall = taosAllocateQall();
+ taosReadAllQitems(pTask->outputQ, qall);
+ SArray** ppRes = NULL;
+ while (1) {
+ taosGetQitem(qall, (void**)&ppRes);
+ if (ppRes == NULL) break;
+
+ SArray* pRes = *ppRes;
+ if (pTask->sinkType == TASK_SINK__TABLE) {
+ pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, pBlock->sourceVer, pRes);
+ } else if (pTask->sinkType == TASK_SINK__SMA) {
+ pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes);
+ } else {
+ }
+
+ // dispatch
+ if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
+ SRpcMsg dispatchMsg = {0};
+ if (streamBuildDispatchMsg(pTask, pRes, &dispatchMsg, NULL) < 0) {
+ ASSERT(0);
+ return -1;
+ }
+
+ int32_t qType;
+ if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) {
+ qType = FETCH_QUEUE;
+ } else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC ||
+ pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) {
+ qType = MERGE_QUEUE;
+ } else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) {
+ qType = WRITE_QUEUE;
+ } else {
+ ASSERT(0);
+ }
+ tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);
+
+ } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
+ SRpcMsg dispatchMsg = {0};
+ SEpSet* pEpSet = NULL;
+ if (streamBuildDispatchMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) {
+ ASSERT(0);
+ return -1;
+ }
+
+ tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg);
+
+ } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
+ SHashObj* pShuffleRes = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
+ if (pShuffleRes == NULL) {
+ return -1;
+ }
+
+ int32_t sz = taosArrayGetSize(pRes);
+ for (int32_t i = 0; i < sz; i++) {
+ SSDataBlock* pDataBlock = taosArrayGet(pRes, i);
+ SArray* pArray = taosHashGet(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t));
+ if (pArray == NULL) {
+ pArray = taosArrayInit(0, sizeof(SSDataBlock));
+ if (pArray == NULL) {
+ return -1;
+ }
+ taosHashPut(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t), &pArray, sizeof(void*));
+ }
+ taosArrayPush(pArray, pDataBlock);
+ }
+
+ if (streamShuffleDispatch(pTask, pMsgCb, pShuffleRes) < 0) {
+ return -1;
+ }
+
+ } else {
+ ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
+ }
+ }
+ //
+ return 0;
+}
+
+int32_t streamTaskProcessDispatchRsp(SStreamTask* pTask, char* msg, int32_t msgLen) {
+ //
+ return 0;
+}
+
+int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, char* msg) {
+ //
+ return 0;
+}
+
+int32_t streamTaskExecNew(SStreamTask* pTask) {
+ SArray* pRes = NULL;
+ if (pTask->execType == TASK_EXEC__PIPE || pTask->execType == TASK_EXEC__MERGE) {
+ // TODO remove multi runner
+ void* exec = pTask->exec.runners[0].executor;
+
+ int8_t status = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
+ if (status == TASK_STATUS__IDLE) {
+ pRes = taosArrayInit(0, sizeof(void*));
+ if (pRes == NULL) {
+ return -1;
+ }
+
+ void* input = NULL;
+ taosWriteQitem(pTask->inputQ, &input);
+ if (input == NULL) return 0;
+
+ // TODO: fix type
+ if (pTask->sourceType == TASK_SOURCE__SCAN) {
+ SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)input;
+ qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK);
+ while (1) {
+ SSDataBlock* output;
+ uint64_t ts = 0;
+ if (qExecTask(exec, &output, &ts) < 0) {
+ ASSERT(false);
+ }
+ if (output == NULL) break;
+ taosArrayPush(pRes, &output);
+ }
+ streamDataSubmitRefDec(pSubmit);
+ } else {
+ SStreamDataBlock* pStreamBlock = (SStreamDataBlock*)input;
+ const SArray* blocks = pStreamBlock->blocks;
+ qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK);
+ while (1) {
+ SSDataBlock* output;
+ uint64_t ts = 0;
+ if (qExecTask(exec, &output, &ts) < 0) {
+ ASSERT(false);
+ }
+ if (output == NULL) break;
+ taosArrayPush(pRes, &output);
+ }
+ // TODO: wrap destroy block
+ taosArrayDestroyP(pStreamBlock->blocks, (FDelete)blockDataDestroy);
+ }
+
+ if (taosArrayGetSize(pRes) != 0) {
+ SArray** resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM);
+ *resQ = pRes;
+ taosWriteQitem(pTask->outputQ, resQ);
+ }
+ }
+ }
+ return 0;
+}
+
int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId) {
SArray* pRes = NULL;
// source
@@ -251,9 +490,17 @@ SStreamTask* tNewSStreamTask(int64_t streamId) {
}
pTask->taskId = tGenIdPI32();
pTask->streamId = streamId;
- pTask->status = STREAM_TASK_STATUS__RUNNING;
- /*pTask->qmsg = NULL;*/
+ pTask->status = TASK_STATUS__IDLE;
+
+ pTask->inputQ = taosOpenQueue();
+ pTask->outputQ = taosOpenQueue();
+ if (pTask->inputQ == NULL || pTask->outputQ == NULL) goto FAIL;
return pTask;
+FAIL:
+ if (pTask->inputQ) taosCloseQueue(pTask->inputQ);
+ if (pTask->outputQ) taosCloseQueue(pTask->outputQ);
+ if (pTask) taosMemoryFree(pTask);
+ return NULL;
}
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
@@ -349,10 +596,16 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
}
void tFreeSStreamTask(SStreamTask* pTask) {
+ taosCloseQueue(pTask->inputQ);
+ taosCloseQueue(pTask->outputQ);
// TODO
- /*taosMemoryFree(pTask->qmsg);*/
+ if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg);
+ for (int32_t i = 0; i < pTask->exec.numOfRunners; i++) {
+ qDestroyTask(pTask->exec.runners[i].executor);
+ }
+ taosMemoryFree(pTask->exec.runners);
/*taosMemoryFree(pTask->executor);*/
- /*taosMemoryFree(pTask);*/
+ taosMemoryFree(pTask);
}
#if 0
diff --git a/tests/script/tsim/tstream/basic1.sim b/tests/script/tsim/tstream/basic1.sim
index 3bb5943b3b48b23115b48bb96e1bf41c186836f3..37f9cb94c9753fbed5afebd3c66210ced04d21a8 100644
--- a/tests/script/tsim/tstream/basic1.sim
+++ b/tests/script/tsim/tstream/basic1.sim
@@ -136,7 +136,7 @@ if $data35 != 3 then
endi
sql insert into t1 values(1648791223001,12,14,13,11.1);
-sleep 100
+sleep 500
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt;
if $rows != 4 then