diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 23dd3498610810363ebe4cc8ad69b90881936673..cb8b077b476b778df3cc4b8b0ed097b09400bf10 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -58,7 +58,10 @@ typedef struct SDataBlockInfo { int32_t rows; int32_t rowSize; int32_t numOfCols; - union {int64_t uid; int64_t blockId;}; + union { + int64_t uid; + int64_t blockId; + }; } SDataBlockInfo; typedef struct SConstantItem { @@ -70,10 +73,10 @@ typedef struct SConstantItem { // info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList); typedef struct SSDataBlock { - SColumnDataAgg *pBlockAgg; - SArray *pDataBlock; // SArray - SArray *pConstantList; // SArray, it is a constant/tags value of the corresponding result value. - SDataBlockInfo info; + SColumnDataAgg* pBlockAgg; + SArray* pDataBlock; // SArray + SArray* pConstantList; // SArray, it is a constant/tags value of the corresponding result value. + SDataBlockInfo info; } SSDataBlock; typedef struct SVarColAttr { @@ -244,7 +247,7 @@ typedef struct SGroupbyExpr { typedef struct SFunctParam { int32_t type; - SColumn *pCol; + SColumn* pCol; SVariant param; } SFunctParam; @@ -262,12 +265,12 @@ typedef struct SResSchame { typedef struct SExprBasicInfo { SResSchema resSchema; int16_t numOfParams; // argument value of each function - SFunctParam *pParam; + SFunctParam* pParam; } SExprBasicInfo; typedef struct SExprInfo { - struct SExprBasicInfo base; - struct tExprNode *pExpr; + struct SExprBasicInfo base; + struct tExprNode* pExpr; } SExprInfo; typedef struct SStateWindow { diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a3e35e18741cb764f2a0d65c9c1fab007a876a90..30f61374eaf489172b83383927aa612ad6cdc69e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -24,6 +24,7 @@ #include "thash.h" #include "tlist.h" #include "trow.h" +#include "tuuid.h" #ifdef __cplusplus extern "C" { @@ -171,7 +172,7 @@ typedef struct { char db[TSDB_DB_FNAME_LEN]; int64_t dbId; int32_t vgVersion; - int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT + int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT } SBuildUseDBInput; typedef struct SField { @@ -427,10 +428,10 @@ typedef struct { int16_t slotId; }; - int16_t type; - int32_t bytes; - uint8_t precision; - uint8_t scale; + int16_t type; + int32_t bytes; + uint8_t precision; + uint8_t scale; } SColumnInfo; typedef struct { @@ -526,7 +527,7 @@ typedef struct { char db[TSDB_DB_FNAME_LEN]; int64_t dbId; int32_t vgVersion; - int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT + int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT } SUseDbReq; int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); @@ -553,15 +554,13 @@ int32_t tSerializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq); int32_t tDeserializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq); typedef struct { - SArray *epSetList; // SArray + SArray* epSetList; // SArray } SQnodeListRsp; int32_t tSerializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp); int32_t tDeserializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp); void tFreeSQnodeListRsp(SQnodeListRsp* pRsp); - - typedef struct { SArray* pArray; // Array of SUseDbRsp } SUseDbBatchRsp; @@ -777,7 +776,6 @@ typedef struct SVgroupInfo { int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT } SVgroupInfo; - typedef struct { int32_t numOfVgroups; SVgroupInfo vgroups[]; @@ -1062,8 +1060,8 @@ typedef struct { } STaskStatus; typedef struct { - int64_t refId; - SArray *taskStatus; //SArray + int64_t refId; + SArray* taskStatus; // SArray } SSchedulerStatusRsp; typedef struct { @@ -1072,35 +1070,31 @@ typedef struct { int8_t action; } STaskAction; - typedef struct SQueryNodeEpId { int32_t nodeId; // vgId or qnodeId SEp ep; } SQueryNodeEpId; - typedef struct { SMsgHead header; uint64_t sId; SQueryNodeEpId epId; - SArray *taskAction; //SArray + SArray* taskAction; // SArray } SSchedulerHbReq; -int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq); -int32_t tDeserializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq); -void tFreeSSchedulerHbReq(SSchedulerHbReq *pReq); - +int32_t tSerializeSSchedulerHbReq(void* buf, int32_t bufLen, SSchedulerHbReq* pReq); +int32_t tDeserializeSSchedulerHbReq(void* buf, int32_t bufLen, SSchedulerHbReq* pReq); +void tFreeSSchedulerHbReq(SSchedulerHbReq* pReq); typedef struct { uint64_t seqId; SQueryNodeEpId epId; - SArray *taskStatus; //SArray + SArray* taskStatus; // SArray } SSchedulerHbRsp; -int32_t tSerializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pRsp); -int32_t tDeserializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pRsp); -void tFreeSSchedulerHbRsp(SSchedulerHbRsp *pRsp); - +int32_t tSerializeSSchedulerHbRsp(void* buf, int32_t bufLen, SSchedulerHbRsp* pRsp); +int32_t tDeserializeSSchedulerHbRsp(void* buf, int32_t bufLen, SSchedulerHbRsp* pRsp); +void tFreeSSchedulerHbRsp(SSchedulerHbRsp* pRsp); typedef struct { SMsgHead header; @@ -1370,7 +1364,7 @@ typedef struct SVCreateTbReq { } SVCreateTbReq, SVUpdateTbReq; typedef struct { - int tmp; // TODO: to avoid compile error + int tmp; // TODO: to avoid compile error } SVCreateTbRsp, SVUpdateTbRsp; int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq); @@ -1382,7 +1376,7 @@ typedef struct { } SVCreateTbBatchReq; typedef struct { - int tmp; // TODO: to avoid compile error + int tmp; // TODO: to avoid compile error } SVCreateTbBatchRsp; int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq); @@ -1396,7 +1390,7 @@ typedef struct { } SVDropTbReq; typedef struct { - int tmp; // TODO: to avoid compile error + int tmp; // TODO: to avoid compile error } SVDropTbRsp; int32_t tSerializeSVDropTbReq(void** buf, SVDropTbReq* pReq); @@ -1933,7 +1927,7 @@ typedef struct { } SVCreateTSmaReq; typedef struct { - int8_t type; // 0 status report, 1 update data + int8_t type; // 0 status report, 1 update data char indexName[TSDB_INDEX_NAME_LEN]; // STimeWindow windows; } STSmaMsg; @@ -1944,7 +1938,7 @@ typedef struct { } SVDropTSmaReq; typedef struct { - int tmp; // TODO: to avoid compile error + int tmp; // TODO: to avoid compile error } SVCreateTSmaRsp, SVDropTSmaRsp; int32_t tSerializeSVCreateTSmaReq(void** buf, SVCreateTSmaReq* pReq); @@ -2029,7 +2023,7 @@ static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) { tlen += taosEncodeFixedI64(buf, pSma->tableUid); tlen += taosEncodeFixedI64(buf, pSma->interval); tlen += taosEncodeFixedI64(buf, pSma->sliding); - + if (pSma->exprLen > 0) { tlen += taosEncodeString(buf, pSma->expr); } @@ -2064,7 +2058,6 @@ static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) { buf = taosDecodeFixedI64(buf, &pSma->interval); buf = taosDecodeFixedI64(buf, &pSma->sliding); - if (pSma->exprLen > 0) { pSma->expr = (char*)calloc(pSma->exprLen, 1); if (pSma->expr != NULL) { @@ -2265,6 +2258,51 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p return buf; } +enum { + STREAM_TASK_STATUS__RUNNING = 1, + STREAM_TASK_STATUS__STOP, +}; + +typedef struct { + int64_t streamId; + int32_t taskId; + int32_t level; + int8_t status; + char* qmsg; + void* executor; + // void* stateStore; + // storage handle +} SStreamTask; + +static FORCE_INLINE SStreamTask* streamTaskNew(int64_t streamId, int32_t level) { + SStreamTask* pTask = (SStreamTask*)calloc(1, sizeof(SStreamTask)); + if (pTask == NULL) { + return NULL; + } + pTask->taskId = tGenIdPI32(); + pTask->status = STREAM_TASK_STATUS__RUNNING; + pTask->qmsg = NULL; + return pTask; +} + +int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask); +int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask); +void tFreeSStreamTask(SStreamTask* pTask); + +typedef struct { + SMsgHead head; + SStreamTask* task; +} SStreamTaskDeployReq; + +typedef struct { + int32_t reserved; +} SStreamTaskDeployRsp; + +typedef struct { + SMsgHead head; + // TODO: other info needed by task +} SStreamTaskExecReq; + #pragma pack(pop) #ifdef __cplusplus diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 03f8daad42e1106b2acecfc2097c456fc049c7af..b30a325d7cc962c09730d3293a6acc5b9995f27c 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -199,6 +199,7 @@ enum { // Requests handled by SNODE TD_NEW_MSG_SEG(TDMT_SND_MSG) + TD_DEF_MSG_TYPE(TDMT_SND_TASK_DEPLOY, "snode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) #if defined(TD_MSG_NUMBER_) TDMT_MAX diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index 21a93532e0d77b3c8f5e6157380e7804b393d622..b25f8a8666bec1e76ef394572ae24f9fba0e14dd 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -16,6 +16,7 @@ #ifndef _TD_SNODE_H_ #define _TD_SNODE_H_ +#include "tcommon.h" #include "tmsg.h" #include "trpc.h" @@ -78,7 +79,7 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad); * @param pRsp The response message * @return int32_t 0 for success, -1 for failure */ -int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +// int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index f26f19f3b2b6c1bc122fac359442a5481b265bd7..121f5271b3d61788bd41f5516e07d662352d1739 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1467,8 +1467,7 @@ int32_t tDeserializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) { return 0; } - -int32_t tSerializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq) { +int32_t tSerializeSQnodeListReq(void *buf, int32_t bufLen, SQnodeListReq *pReq) { SCoder encoder = {0}; tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); @@ -1499,7 +1498,7 @@ int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp) if (tStartEncode(&encoder) < 0) return -1; int32_t num = taosArrayGetSize(pRsp->epSetList); - if (tEncodeI32(&encoder, num) < 0) return -1; + if (tEncodeI32(&encoder, num) < 0) return -1; for (int32_t i = 0; i < num; ++i) { SEpSet *epSet = taosArrayGet(pRsp->epSetList, i); if (tEncodeSEpSet(&encoder, epSet) < 0) return -1; @@ -2491,27 +2490,27 @@ int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pR tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeU64(&encoder, pReq->sId) < 0) return -1; - if (tEncodeI32(&encoder, pReq->epId.nodeId) < 0) return -1; + if (tEncodeU64(&encoder, pReq->sId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->epId.nodeId) < 0) return -1; if (tEncodeU16(&encoder, pReq->epId.ep.port) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->epId.ep.fqdn) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->epId.ep.fqdn) < 0) return -1; if (pReq->taskAction) { int32_t num = taosArrayGetSize(pReq->taskAction); - if (tEncodeI32(&encoder, num) < 0) return -1; + if (tEncodeI32(&encoder, num) < 0) return -1; for (int32_t i = 0; i < num; ++i) { STaskAction *action = taosArrayGet(pReq->taskAction, i); - if (tEncodeU64(&encoder, action->queryId) < 0) return -1; - if (tEncodeU64(&encoder, action->taskId) < 0) return -1; - if (tEncodeI8(&encoder, action->action) < 0) return -1; + if (tEncodeU64(&encoder, action->queryId) < 0) return -1; + if (tEncodeU64(&encoder, action->taskId) < 0) return -1; + if (tEncodeI8(&encoder, action->action) < 0) return -1; } } else { - if (tEncodeI32(&encoder, 0) < 0) return -1; + if (tEncodeI32(&encoder, 0) < 0) return -1; } tEndEncode(&encoder); int32_t tlen = encoder.pos; tCoderClear(&encoder); - + if (buf != NULL) { SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen); pHead->vgId = htonl(pReq->header.vgId); @@ -2559,29 +2558,27 @@ int32_t tDeserializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq * void tFreeSSchedulerHbReq(SSchedulerHbReq *pReq) { taosArrayDestroy(pReq->taskAction); } - - int32_t tSerializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pRsp) { SCoder encoder = {0}; tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeU64(&encoder, pRsp->seqId) < 0) return -1; - if (tEncodeI32(&encoder, pRsp->epId.nodeId) < 0) return -1; + if (tEncodeU64(&encoder, pRsp->seqId) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->epId.nodeId) < 0) return -1; if (tEncodeU16(&encoder, pRsp->epId.ep.port) < 0) return -1; - if (tEncodeCStr(&encoder, pRsp->epId.ep.fqdn) < 0) return -1; + if (tEncodeCStr(&encoder, pRsp->epId.ep.fqdn) < 0) return -1; if (pRsp->taskStatus) { int32_t num = taosArrayGetSize(pRsp->taskStatus); - if (tEncodeI32(&encoder, num) < 0) return -1; + if (tEncodeI32(&encoder, num) < 0) return -1; for (int32_t i = 0; i < num; ++i) { STaskStatus *status = taosArrayGet(pRsp->taskStatus, i); - if (tEncodeU64(&encoder, status->queryId) < 0) return -1; - if (tEncodeU64(&encoder, status->taskId) < 0) return -1; - if (tEncodeI64(&encoder, status->refId) < 0) return -1; - if (tEncodeI8(&encoder, status->status) < 0) return -1; + if (tEncodeU64(&encoder, status->queryId) < 0) return -1; + if (tEncodeU64(&encoder, status->taskId) < 0) return -1; + if (tEncodeI64(&encoder, status->refId) < 0) return -1; + if (tEncodeI8(&encoder, status->status) < 0) return -1; } } else { - if (tEncodeI32(&encoder, 0) < 0) return -1; + if (tEncodeI32(&encoder, 0) < 0) return -1; } tEndEncode(&encoder); @@ -2694,3 +2691,32 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) { tfree(pReq->physicalPlan); tfree(pReq->logicalPlan); } + +int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->level) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; + if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; + if (tDecodeCStr(pDecoder, (const char **)&pTask->qmsg) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + +void tFreeSStreamTask(SStreamTask *pTask) { + // TODO + /*free(pTask->qmsg);*/ + /*free(pTask->executor);*/ + /*free(pTask);*/ +} diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c index 5ea8a841d22a1b0cf03c554285d5c7107aca7205..8667952f2c71a5febe171e0eae6af4e61ad725f4 100644 --- a/source/dnode/mgmt/impl/src/dndSnode.c +++ b/source/dnode/mgmt/impl/src/dndSnode.c @@ -323,8 +323,8 @@ int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pReq) { } static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) { - SSnodeMgmt *pMgmt = &pDnode->smgmt; - int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; + /*SSnodeMgmt *pMgmt = &pDnode->smgmt;*/ + int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; SSnode *pSnode = dndAcquireSnode(pDnode); if (pSnode != NULL) { @@ -334,22 +334,35 @@ static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t sndProcessUMsg(pSnode, pMsg); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + } + dndReleaseSnode(pDnode, pSnode); + } else { + for (int32_t i = 0; i < numOfMsgs; i++) { + SRpcMsg *pMsg = NULL; + taosGetQitem(qall, (void **)&pMsg); + SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; + rpcSendResponse(&rpcRsp); + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } } - dndReleaseSnode(pDnode, pSnode); } static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg) { - SSnodeMgmt *pMgmt = &pDnode->smgmt; - int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; + /*SSnodeMgmt *pMgmt = &pDnode->smgmt;*/ + int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; SSnode *pSnode = dndAcquireSnode(pDnode); if (pSnode != NULL) { - code = sndProcessSMsg(pSnode, pMsg); + sndProcessSMsg(pSnode, pMsg); + dndReleaseSnode(pDnode, pSnode); + } else { + SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; + rpcSendResponse(&rpcRsp); } - dndReleaseSnode(pDnode, pSnode); #if 0 if (pMsg->msgType & 1u) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index a7116b72ffe66df005ae46441053df0e1d79bcc8..8ea9cc141f8d673efc8e33cb734a6854f2d8bbe9 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -85,6 +85,8 @@ typedef enum { TRN_TYPE_REBALANCE = 1017, TRN_TYPE_COMMIT_OFFSET = 1018, TRN_TYPE_CREATE_STREAM = 1019, + TRN_TYPE_DROP_STREAM = 1020, + TRN_TYPE_ALTER_STREAM = 1021, TRN_TYPE_BASIC_SCOPE_END, TRN_TYPE_GLOBAL_SCOPE = 2000, TRN_TYPE_CREATE_DNODE = 2001, @@ -679,12 +681,6 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons return buf; } -typedef struct { - int32_t taskId; - int32_t level; - SSubplan* plan; -} SStreamTaskMeta; - typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN]; @@ -700,7 +696,7 @@ typedef struct { char* sql; char* logicalPlan; char* physicalPlan; - SArray* tasks; // SArray> + SArray* tasks; // SArray> } SStreamObj; int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj); diff --git a/source/dnode/mnode/impl/inc/mndScheduler.h b/source/dnode/mnode/impl/inc/mndScheduler.h index 3bf6e0c33a6a07c633c38323c133099fed62e17b..42951beca2e414611543d92e08b86d19f6247636 100644 --- a/source/dnode/mnode/impl/inc/mndScheduler.h +++ b/source/dnode/mnode/impl/inc/mndScheduler.h @@ -27,6 +27,8 @@ void mndCleanupScheduler(SMnode* pMnode); int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub); +int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 855e244daafc2ed247348b7141d3d41d45aa399b..16b1ba8a5c9cc86db1735e7a87da81c969eda4cf 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -31,7 +31,7 @@ #include "tname.h" #include "tuuid.h" -int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { +int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SSdb* pSdb = pMnode->pSdb; SVgObj* pVgroup = NULL; SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); @@ -41,17 +41,18 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { } ASSERT(pStream->vgNum == 0); - int32_t levelNum = LIST_LENGTH(pPlan->pSubplans); - pStream->tasks = taosArrayInit(levelNum, sizeof(SArray)); + int32_t totLevel = LIST_LENGTH(pPlan->pSubplans); + pStream->tasks = taosArrayInit(totLevel, sizeof(SArray)); - for (int32_t i = 0; i < levelNum; i++) { - SArray* taskOneLevel = taosArrayInit(0, sizeof(SStreamTaskMeta)); - SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, i); + int32_t msgLen; + for (int32_t level = 0; level < totLevel; level++) { + SArray* taskOneLevel = taosArrayInit(0, sizeof(SStreamTask)); + SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level); int32_t opNum = LIST_LENGTH(inner->pNodeList); ASSERT(opNum == 1); - SSubplan* plan = nodesListGetNode(inner->pNodeList, 0); - if (i == 0) { + SSubplan* plan = nodesListGetNode(inner->pNodeList, level); + if (level == 0) { ASSERT(plan->type == SUBPLAN_TYPE_SCAN); void* pIter = NULL; while (1) { @@ -63,15 +64,19 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { } pStream->vgNum++; + // send to vnode + + SStreamTask* pTask = streamTaskNew(pStream->uid, level); + plan->execNode.nodeId = pVgroup->vgId; plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup); - SStreamTaskMeta task = { - .taskId = tGenIdPI32(), - .level = i, - .plan = plan, - }; - // send to vnode - taosArrayPush(taskOneLevel, &task); + if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) { + sdbRelease(pSdb, pVgroup); + qDestroyQueryPlan(pPlan); + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } + taosArrayPush(taskOneLevel, pTask); } } else if (plan->subplanType == SUBPLAN_TYPE_SCAN) { // duplicatable @@ -82,22 +87,36 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { // if has snode, set to shared thread num in snode parallel = SND_SHARED_THREAD_NUM; - for (int32_t j = 0; j < parallel; j++) { - SStreamTaskMeta task = { - .taskId = tGenIdPI32(), - .level = i, - .plan = plan, - }; - taosArrayPush(taskOneLevel, &task); + for (int32_t i = 0; i < parallel; i++) { + SStreamTask* pTask = streamTaskNew(pStream->uid, level); + + // TODO:get snode id and ep + plan->execNode.nodeId = pVgroup->vgId; + plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup); + + if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) { + qDestroyQueryPlan(pPlan); + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } + + taosArrayPush(taskOneLevel, pTask); } } else { // not duplicatable - SStreamTaskMeta task = { - .taskId = tGenIdPI32(), - .level = i, - .plan = plan, - }; - taosArrayPush(taskOneLevel, &task); + SStreamTask* pTask = streamTaskNew(pStream->uid, level); + + // TODO:get snode id and ep + plan->execNode.nodeId = pVgroup->vgId; + plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup); + + if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) { + sdbRelease(pSdb, pVgroup); + qDestroyQueryPlan(pPlan); + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } + taosArrayPush(taskOneLevel, pTask); } taosArrayPush(pStream->tasks, taskOneLevel); } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 54ad9cd7e239b20a84eaa6f7c791b1942413598f..67011dfe8a3979e315eb234a0223ee245dcb9ea0 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -18,6 +18,7 @@ #include "mndDb.h" #include "mndDnode.h" #include "mndMnode.h" +#include "mndScheduler.h" #include "mndShow.h" #include "mndStb.h" #include "mndTrans.h" @@ -237,6 +238,12 @@ static int32_t mndCreateStream(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateStreamR } sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY); + if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) { + mError("stream:%ld, schedule stream since %s", streamObj.uid, terrstr()); + mndTransDrop(pTrans); + return -1; + } + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); diff --git a/source/dnode/snode/CMakeLists.txt b/source/dnode/snode/CMakeLists.txt index dafd5d659416b04f027e85aa33122c6371e6d153..d1b1abdf1d523aca7fd0ccb56180b7535fde40fd 100644 --- a/source/dnode/snode/CMakeLists.txt +++ b/source/dnode/snode/CMakeLists.txt @@ -7,8 +7,9 @@ target_include_directories( ) target_link_libraries( snode + PRIVATE executor PRIVATE transport PRIVATE os PRIVATE common PRIVATE util -) \ No newline at end of file +) diff --git a/source/dnode/snode/inc/sndInt.h b/source/dnode/snode/inc/sndInt.h index 3fe816845ddb3272b0f99c1e2d5fb5fcce9a03cd..e5f6c3c266109d314fd7f171d94261da0449e002 100644 --- a/source/dnode/snode/inc/sndInt.h +++ b/source/dnode/snode/inc/sndInt.h @@ -38,13 +38,8 @@ enum { STREAM_STATUS__DELETING, }; -enum { - STREAM_TASK_STATUS__RUNNING = 1, - STREAM_TASK_STATUS__STOP, -}; - typedef struct { - SHashObj* pHash; // taskId -> streamTask + SHashObj* pHash; // taskId -> SStreamTask } SStreamMeta; typedef struct SSnode { @@ -52,26 +47,16 @@ typedef struct SSnode { SSnodeOpt cfg; } SSnode; -typedef struct { - int64_t streamId; - int32_t taskId; - int32_t IdxInLevel; - int32_t level; -} SStreamTaskInfo; +SStreamMeta* sndMetaNew(); +void sndMetaDelete(SStreamMeta* pMeta); -typedef struct { - SStreamTaskInfo meta; - int8_t status; - void* executor; - void* stateStore; - // storage handle -} SStreamTask; +int32_t sndMetaDeployTask(SStreamMeta* pMeta, SStreamTask* pTask); +int32_t sndMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); -int32_t sndCreateTask(); -int32_t sndDropTaskOfStream(int64_t streamId); +int32_t sndDropTaskOfStream(SStreamMeta* pMeta, int64_t streamId); -int32_t sndStopTaskOfStream(int64_t streamId); -int32_t sndResumeTaskOfStream(int64_t streamId); +int32_t sndStopTaskOfStream(SStreamMeta* pMeta, int64_t streamId); +int32_t sndResumeTaskOfStream(SStreamMeta* pMeta, int64_t streamId); #ifdef __cplusplus } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 74e41d45c527f3a1de2cfbc353e25159dc82cfc6..80e33bd9719c950f6c8ce8d158f8aae03bcec2c6 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -13,40 +13,91 @@ * along with this program. If not, see . */ +#include "executor.h" #include "sndInt.h" #include "tuuid.h" SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { SSnode *pSnode = calloc(1, sizeof(SSnode)); + if (pSnode == NULL) { + return NULL; + } memcpy(&pSnode->cfg, pOption, sizeof(SSnodeOpt)); + pSnode->pMeta = sndMetaNew(); + if (pSnode->pMeta == NULL) { + free(pSnode); + return NULL; + } return pSnode; } -void sndClose(SSnode *pSnode) { free(pSnode); } +void sndClose(SSnode *pSnode) { + sndMetaDelete(pSnode->pMeta); + free(pSnode); +} int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; } -int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - *pRsp = NULL; - return 0; -} +/*int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {*/ +/**pRsp = NULL;*/ +/*return 0;*/ +/*}*/ void sndDestroy(const char *path) {} -static int32_t sndDeployTask(SSnode *pSnode, SRpcMsg *pMsg) { - SStreamTask *task = malloc(sizeof(SStreamTask)); - if (task == NULL) { +SStreamMeta *sndMetaNew() { + SStreamMeta *pMeta = calloc(1, sizeof(SStreamMeta)); + if (pMeta == NULL) { + return NULL; + } + pMeta->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + if (pMeta->pHash == NULL) { + free(pMeta); + return NULL; + } + return pMeta; +} + +void sndMetaDelete(SStreamMeta *pMeta) { + taosHashCleanup(pMeta->pHash); + free(pMeta); +} + +int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) { + pTask->executor = qCreateStreamExecTaskInfo(pTask->qmsg, NULL); + return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *)); +} + +int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) { + SStreamTask *pTask = taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t)); + if (pTask == NULL) { return -1; } - task->meta.taskId = tGenIdPI32(); - taosHashPut(pSnode->pMeta->pHash, &task->meta.taskId, sizeof(int32_t), &task, sizeof(void *)); - return 0; + free(pTask->qmsg); + // TODO:free executor + free(pTask); + return taosHashRemove(pMeta->pHash, &taskId, sizeof(int32_t)); } int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { - // stream deployment + // stream deploy // stream stop/resume // operator exec + if (pMsg->msgType == TDMT_SND_TASK_DEPLOY) { + void *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + SStreamTask *pTask = malloc(sizeof(SStreamTask)); + if (pTask == NULL) { + return -1; + } + SCoder decoder; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, msg, pMsg->contLen - sizeof(SMsgHead), TD_DECODER); + tDecodeSStreamTask(&decoder, pTask); + tCoderClear(&decoder); + + sndMetaDeployTask(pSnode->pMeta, pTask); + } else { + // + } return 0; }