diff --git a/examples/c/stream_demo.c b/examples/c/stream_demo.c index 943fcbdb5329c4c53e9f4c663497419091f8b4d2..ab59fa5e474cb8e6dde123b18c8cc265570aeeaa 100644 --- a/examples/c/stream_demo.c +++ b/examples/c/stream_demo.c @@ -82,7 +82,8 @@ int32_t create_stream() { /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ pRes = taos_query( - pConn, "create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from st1 interval(10m)"); + pConn, + "create stream stream1 trigger max_delay 10s into outstb as select _wstartts, sum(k) from st1 interval(10m)"); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 288248422b8288b98d8f0fccaef040186294cb76..3f05c27453d89eaccd845c859064b579b5d8538f 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -38,8 +38,10 @@ typedef struct SReadHandle { SMsgCb* pMsgCb; } SReadHandle; -#define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1 -#define STREAM_DATA_TYPE_SSDATA_BLOCK 0x2 +enum { + STREAM_DATA_TYPE_SUBMIT_BLOCK = 1, + STREAM_DATA_TYPE_SSDATA_BLOCK = 2, +}; typedef enum { OPTR_EXEC_MODEL_BATCH = 0x1, @@ -102,7 +104,8 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, * @param tversion * @return */ -int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, int32_t* tversion); +int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, + int32_t* tversion); /** * The main task execution function, including query on both table and multiple tables, diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 960794792ba408c565b5a7430c5a412b1b60cb4c..2c9d66a828e658cd286d2e5bf4edcc28d7ea37a9 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -58,6 +58,7 @@ enum { enum { STREAM_INPUT__DATA_SUBMIT = 1, STREAM_INPUT__DATA_BLOCK, + STREAM_INPUT__TRIGGER, STREAM_INPUT__CHECKPOINT, }; @@ -85,6 +86,11 @@ typedef struct { int8_t type; } SStreamCheckpoint; +typedef struct { + int8_t type; + SSDataBlock* pBlock; +} SStreamTrigger; + enum { STREAM_QUEUE__SUCESS = 1, STREAM_QUEUE__FAILED, @@ -98,6 +104,9 @@ typedef struct { int8_t status; } SStreamQueue; +int32_t streamInit(); +void streamCleanUp(); + SStreamQueue* streamQueueOpen(); void streamQueueClose(SStreamQueue* queue); @@ -220,6 +229,11 @@ enum { TASK_INPUT_TYPE__DATA_BLOCK, }; +enum { + TASK_TRIGGER_STATUS__IN_ACTIVE = 1, + TASK_TRIGGER_STATUS__ACTIVE, +}; + struct SStreamTask { int64_t streamId; int32_t taskId; @@ -262,8 +276,16 @@ struct SStreamTask { SStreamQueue* inputQueue; SStreamQueue* outputQueue; + // trigger + int8_t triggerStatus; + int64_t triggerParam; + void* timer; + // application storage // void* ahandle; + + // msg handle + SMsgCb* pMsgCb; }; SStreamTask* tNewSStreamTask(int64_t streamId); @@ -292,6 +314,13 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (pItem->type == STREAM_INPUT__CHECKPOINT) { taosWriteQitem(pTask->inputQueue->queue, pItem); + } else if (pItem->type == STREAM_INPUT__TRIGGER) { + taosWriteQitem(pTask->inputQueue->queue, pItem); + } + + if (pItem->type != STREAM_INPUT__TRIGGER && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0 && + pTask->triggerStatus == TASK_TRIGGER_STATUS__IN_ACTIVE) { + atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__ACTIVE); } // TODO: back pressure @@ -370,7 +399,8 @@ typedef struct { int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); -int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb); +int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb); +int32_t streamSetupTrigger(SStreamTask* pTask); int32_t streamTaskRun(SStreamTask* pTask); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 10c6c3623ad219625b688a9b1dce5097d337f6b9..dd900e6045294c8d6d6eedee733a682758a24cb9 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -992,6 +992,90 @@ CREATE_MSG_FAIL: return -1; } +bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { + bool set = false; + + int32_t topicNumGet = taosArrayGetSize(pRsp->topics); + char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; + tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch, + topicNumGet); + + SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic)); + if (newTopics == NULL) { + return false; + } + + SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); + if (pHash == NULL) { + taosArrayDestroy(newTopics); + return false; + } + int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); + for (int32_t i = 0; i < topicNumCur; i++) { + // find old topic + SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i); + if (pTopicCur->vgs) { + int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs); + tscDebug("consumer %ld new vg num: %d", tmq->consumerId, vgNumCur); + if (vgNumCur == 0) break; + for (int32_t j = 0; j < vgNumCur; j++) { + SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); + sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId); + tscDebug("consumer %ld epoch %d vg %d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey); + taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t)); + } + break; + } + } + + for (int32_t i = 0; i < topicNumGet; i++) { + SMqClientTopic topic = {0}; + SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i); + topic.schema = pTopicEp->schema; + taosHashClear(pHash); + topic.topicName = strdup(pTopicEp->topic); + tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN); + + tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName); + + int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs); + topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg)); + for (int32_t j = 0; j < vgNumGet; j++) { + SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); + sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId); + int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey)); + int64_t offset = tmq->resetOffsetCfg; + if (pOffset != NULL) { + offset = *pOffset; + } + + tscDebug("consumer %ld(epoch %d) offset of vg %d updated to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset); + SMqClientVg clientVg = { + .pollCnt = 0, + .currentOffset = offset, + .vgId = pVgEp->vgId, + .epSet = pVgEp->epSet, + .vgStatus = TMQ_VG_STATUS__IDLE, + .vgSkipCnt = 0, + }; + taosArrayPush(topic.vgs, &clientVg); + set = true; + } + taosArrayPush(newTopics, &topic); + } + if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics); + taosHashCleanup(pHash); + tmq->clientTopics = newTopics; + + if (taosArrayGetSize(tmq->clientTopics) == 0) + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC); + else + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); + + atomic_store_32(&tmq->epoch, epoch); + return set; +} + bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { /*printf("call update ep %d\n", epoch);*/ bool set = false; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 6dd5d1622c2e00c8b35b6e7bdb82d8c11b40aa1a..2e543149c075225bd1733918f5f76e76d1ffe32c 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -554,8 +554,8 @@ typedef struct { SVgObj fixedSinkVg; int64_t smaId; // 0 for unused int8_t trigger; - int32_t triggerParam; - int64_t waterMark; + int64_t triggerParam; + int64_t watermark; char* sql; char* physicalPlan; SArray* tasks; // SArray> diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 15cd9fa0439a9b7b8a411adf028645caea27aecc..b5d22cb7a5ba931ccbf5c5986ec41d7cbf5e6e5e 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); -int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark, STrans *pTrans); +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 1e2c61ff89025d34a828a366f7d202267112e1f8..6eac91bb43691ac8698746c450d5c22b0f25f3df 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -33,8 +33,8 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { if (tEncodeI8(pEncoder, pObj->status) < 0) return -1; if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1; if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1; - if (tEncodeI32(pEncoder, pObj->triggerParam) < 0) return -1; - if (tEncodeI64(pEncoder, pObj->waterMark) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1; if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1; if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1; @@ -85,8 +85,8 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1; - if (tDecodeI32(pDecoder, &pObj->triggerParam) < 0) return -1; - if (tDecodeI64(pDecoder, &pObj->waterMark) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1; if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index a3b7754a4edac3d6c1d9d122b2d4c794096cfddf..5b745f3ceaed71642d2cc8c0798fa44f6bb160a5 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -387,6 +387,9 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { // input pFinalTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; + // trigger + pFinalTask->triggerParam = pStream->triggerParam; + // dispatch if (mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pFinalTask) < 0) { qDestroyQueryPlan(pPlan); diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index a14eb78ffefafb44fb3a498bb2bb947a11073e5c..003afb4fa0998dd474af07db52013df0f8b60c81 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -561,6 +561,8 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea streamObj.sql = pCreate->sql; streamObj.createdBy = STREAM_CREATED_BY__SMA; streamObj.smaId = smaObj.uid; + streamObj.watermark = 0; + streamObj.trigger = STREAM_TRIGGER_AT_ONCE; if (mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg) != 0) { mError("sma:%s, failed to create since %s", smaObj.name, terrstr()); @@ -583,7 +585,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER; - if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, STREAM_TRIGGER_AT_ONCE, 0, pTrans) != 0) goto _OVER; + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; @@ -1012,7 +1014,6 @@ int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool rsp->suid = pStb->uid; rsp->version = pStb->smaVer; mndReleaseStb(pMnode, pStb); - while (1) { pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 96d199fcb6e6a79b61f58ce1e96f7c2e2aea2ea8..4a7da8f2d142aa5eb664b4b7b024e0e64bd92628 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -235,16 +235,15 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64 } if (TSDB_CODE_SUCCESS == code) { - code = nodesNodeToString((SNode*)pPlan, false, pStr, NULL); + code = nodesNodeToString((SNode *)pPlan, false, pStr, NULL); } nodesDestroyNode(pAst); - nodesDestroyNode((SNode*)pPlan); + nodesDestroyNode((SNode *)pPlan); terrno = code; return code; } -int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark, - STrans *pTrans) { +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) { SNode *pAst = NULL; if (nodesStringToNode(ast, &pAst) < 0) { @@ -258,7 +257,6 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast // free nodesDestroyNode(pAst); - #if 0 printf("|"); for (int i = 0; i < pStream->outputSchema.nCols; i++) { @@ -268,7 +266,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast #endif - if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, triggerType, watermark, &pStream->physicalPlan)) { + if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, pStream->trigger, pStream->watermark, &pStream->physicalPlan)) { mError("topic:%s, failed to get plan since %s", pStream->name, terrstr()); return -1; } @@ -391,7 +389,8 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq streamObj.smaId = 0; /*streamObj.physicalPlan = "";*/ streamObj.trigger = pCreate->triggerType; - streamObj.waterMark = pCreate->watermark; + streamObj.watermark = pCreate->watermark; + streamObj.triggerParam = pCreate->maxDelay; if (streamObj.targetSTbName[0]) { pDb = mndAcquireDbByStb(pMnode, streamObj.targetSTbName); @@ -409,7 +408,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq } mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name); - if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pCreate->triggerType, pCreate->watermark, pTrans) != 0) { + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) { mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; @@ -566,7 +565,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB colDataAppend(pColInfo, numOfRows, (const char *)&pStream->targetSTbName, true); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&pStream->waterMark, false); + colDataAppend(pColInfo, numOfRows, (const char *)&pStream->watermark, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pStream->trigger, false); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9f34ae39c06fe284d4c2402fea04b703f70adf4c..7d2dfeeba54a6438d3994cd5a9957a1345f53d20 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -375,6 +375,8 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) goto FAIL; + pTask->pMsgCb = &pTq->pVnode->msgCb; + // exec if (pTask->execType != TASK_EXEC__NONE) { // expand runners @@ -406,9 +408,12 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { tdGetSTSChemaFromSSChema(&pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols); ASSERT(pTask->tbSink.pTSchema); } + + streamSetupTrigger(pTask); + tqInfo("deploy stream task id %d child id %d on vg %d", pTask->taskId, pTask->childId, pTq->pVnode->config.vgId); - taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask)); + taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); return 0; FAIL: @@ -431,7 +436,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) { while (1) { pIter = taosHashIterate(pTq->pStreamTasks, pIter); if (pIter == NULL) break; - SStreamTask* pTask = (SStreamTask*)pIter; + SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->inputType != STREAM_INPUT__DATA_SUBMIT) continue; if (!failed) { @@ -439,7 +444,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) { continue; } - if (streamTriggerByWrite(pTask, pTq->pVnode->config.vgId, &pTq->pVnode->msgCb) < 0) { + if (streamLaunchByWrite(pTask, pTq->pVnode->config.vgId, &pTq->pVnode->msgCb) < 0) { continue; } } else { @@ -459,7 +464,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { // SStreamTaskRunReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; - SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); streamTaskProcessRunReq(pTask, &pTq->pVnode->msgCb); return 0; } @@ -473,7 +478,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { tDecoderInit(&decoder, msgBody, msgLen); tDecodeStreamDispatchReq(&decoder, &req); int32_t taskId = req.taskId; - SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SRpcMsg rsp = { .info = pMsg->info, .code = 0, @@ -485,7 +490,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRecoverReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; - SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); streamProcessRecoverReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg); return 0; } @@ -493,7 +498,7 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t taskId = pRsp->taskId; - SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); streamProcessDispatchRsp(pTask, &pTq->pVnode->msgCb, pRsp); return 0; } @@ -501,7 +506,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRecoverRsp* pRsp = pMsg->pCont; int32_t taskId = pRsp->taskId; - SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); streamProcessRecoverRsp(pTask, pRsp); return 0; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index fd62849e56805c22472a5ea438140ec655e20df0..bebfc75f176b147c5bf9af517015a13d10740466 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -19,7 +19,8 @@ #include "tdatablock.h" #include "vnode.h" -static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, bool assignUid, char* id) { +static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, bool assignUid, + char* id) { ASSERT(pOperator != NULL); if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { @@ -43,6 +44,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu if (pInfo->blockType == 0) { pInfo->blockType = type; } else if (pInfo->blockType != type) { + ASSERT(0); return TSDB_CODE_QRY_APP_ERROR; } @@ -51,7 +53,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu qError("submit msg messed up when initing stream block, %s" PRIx64, id); return TSDB_CODE_QRY_APP_ERROR; } - } else { + } else if (type == STREAM_DATA_TYPE_SSDATA_BLOCK) { for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; @@ -62,6 +64,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); taosArrayPush(pInfo->pBlockLists, &p); } + } else { + ASSERT(0); } return TSDB_CODE_SUCCESS; @@ -83,7 +87,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, assignUid, GET_TASKID(pTaskInfo)); + int32_t code = + doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, assignUid, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); } else { @@ -162,7 +167,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo pInfo = pInfo->pDownstream[0]; } - int32_t code = 0; + int32_t code = 0; SStreamBlockScanInfo* pScanInfo = pInfo->info; if (isAdd) { // add new table id SArray* qa = filterQualifiedChildTables(pScanInfo, tableIdList); @@ -178,9 +183,10 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo return code; } -int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, int32_t* tversion) { +int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, + int32_t* tversion) { ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL); - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo; + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; *sversion = pTaskInfo->schemaVer.sversion; *tversion = pTaskInfo->schemaVer.tversion; @@ -196,4 +202,4 @@ int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tab } return 0; -} \ No newline at end of file +} diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 833f6db0c30ccadf4bc0342a5873cdd22a21a9b3..62a0c663b67972d4beed03fb52753cdf0cce068f 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1682,7 +1682,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "top", .type = FUNCTION_TYPE_TOP, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translateTopBot, .getEnvFunc = getTopBotFuncEnv, .initFunc = topBotFunctionSetup, @@ -1717,7 +1717,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "bottom", .type = FUNCTION_TYPE_BOTTOM, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translateTopBot, .getEnvFunc = getTopBotFuncEnv, .initFunc = topBotFunctionSetup, diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 27746fff783182999b4a5780387fa48b04a564d9..8aaf4953def5d8fcb1fe354124c28f595649d8e1 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -23,6 +23,13 @@ extern "C" { #endif +typedef struct { + int8_t inited; + void* timer; +} SStreamGlobalEnv; + +static SStreamGlobalEnv streamEnv; + int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index d3828d9ee446f31676e4b0fb4f5ec7c125682d66..6dcbfad957519fc50242da5d4b8d8d6f9add5ddc 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -14,8 +14,74 @@ */ #include "streamInc.h" +#include "ttimer.h" -int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) { +int32_t streamInit() { + int8_t old; + while (1) { + old = atomic_val_compare_exchange_8(&streamEnv.inited, 0, 2); + if (old != 2) break; + } + + if (old == 0) { + streamEnv.timer = taosTmrInit(10000, 100, 10000, "STREAM"); + if (streamEnv.timer == NULL) { + atomic_store_8(&streamEnv.inited, 0); + return -1; + } + atomic_store_8(&streamEnv.inited, 1); + } + return 0; +} + +void streamCleanUp() { + int8_t old; + while (1) { + old = atomic_val_compare_exchange_8(&streamEnv.inited, 1, 2); + if (old != 2) break; + } + + if (old == 1) { + taosTmrCleanUp(streamEnv.timer); + atomic_store_8(&streamEnv.inited, 0); + } +} + +void streamTriggerByTimer(void* param, void* tmrId) { + SStreamTask* pTask = (void*)param; + + if (atomic_load_8(&pTask->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) { + SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM); + if (trigger == NULL) return; + trigger->type = STREAM_INPUT__TRIGGER; + trigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (trigger->pBlock == NULL) { + taosFreeQitem(trigger); + return; + } + trigger->pBlock->info.type = STREAM_GET_ALL; + + atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE); + + streamTaskInput(pTask, (SStreamQueueItem*)trigger); + streamLaunchByWrite(pTask, pTask->nodeId, pTask->pMsgCb); + } + + taosTmrReset(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); +} + +int32_t streamSetupTrigger(SStreamTask* pTask) { + if (pTask->triggerParam != 0) { + if (streamInit() < 0) { + return -1; + } + pTask->timer = taosTmrStart(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer); + pTask->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE; + } + return 0; +} + +int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) { int8_t execStatus = atomic_load_8(&pTask->status); if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) { SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index fe1a85774316068d7691a41d7c2a4ff2ae59e20e..04428136ae614c109f173436d1f7203365bb080d 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -20,15 +20,17 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) void* exec = pTask->exec.executor; // set input - if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) { + SStreamQueueItem* pItem = (SStreamQueueItem*)data; + if (pItem->type == STREAM_INPUT__TRIGGER) { + SStreamTrigger* pTrigger = (SStreamTrigger*)data; + qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_DATA_TYPE_SSDATA_BLOCK, false); + } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; - ASSERT(pSubmit->type == STREAM_INPUT__DATA_SUBMIT); - + ASSERT(pTask->inputType == STREAM_INPUT__DATA_SUBMIT); qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); - } else if (pTask->inputType == STREAM_INPUT__DATA_BLOCK) { + } else if (pItem->type == STREAM_INPUT__DATA_BLOCK) { SStreamDataBlock* pBlock = (SStreamDataBlock*)data; - ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); - + ASSERT(pTask->inputType == STREAM_INPUT__DATA_BLOCK); SArray* blocks = pBlock->blocks; qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, false); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 5d8546bcb9c0b4815b8333f320479a04134e3082..7a7d9d15ad85c7026381283b265bb72b81f32c75 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -72,6 +72,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; /*if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/ } + if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1; /*tEndEncode(pEncoder);*/ return pEncoder->pos; @@ -121,6 +122,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { /*if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/ if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; } + if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1; /*tEndDecode(pDecoder);*/ return 0; diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 2357f760d15bcc86ce6b74e770fab51715b0549e..ec90e5a9b9493d541d365cd39bb3a80a52a7d697 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -174,7 +174,11 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp) } void* taosArrayAddAll(SArray* pArray, const SArray* pInput) { - return taosArrayAddBatch(pArray, pInput->pData, (int32_t)taosArrayGetSize(pInput)); + if (pInput) { + return taosArrayAddBatch(pArray, pInput->pData, (int32_t)taosArrayGetSize(pInput)); + } else { + return NULL; + } } void* taosArrayPop(SArray* pArray) {