diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 26793cc0537f3560cb739fa09ad426ac5d06926a..b43ea77271c05c758f51b1920083aacfb9f06c16 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1167,7 +1167,7 @@ typedef struct { typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; - char outputTbName[TSDB_TABLE_NAME_LEN]; + char outputSTbName[TSDB_TABLE_FNAME_LEN]; int8_t igExists; char* sql; char* ast; @@ -1975,7 +1975,7 @@ typedef struct { int32_t tagsFilterLen; // strlen + 1 int32_t sqlLen; // strlen + 1 int32_t astLen; // strlen + 1 - char* expr; + char* expr; char* tagsFilter; char* sql; char* ast; @@ -1997,9 +1997,9 @@ typedef struct { int8_t version; // for compatibility(default 0) int8_t intervalUnit; // MACRO: TIME_UNIT_XXX int8_t slidingUnit; // MACRO: TIME_UNIT_XXX - int8_t timezoneInt; // sma data expired if timezone changes. + int8_t timezoneInt; // sma data expired if timezone changes. char indexName[TSDB_INDEX_NAME_LEN]; - char timezone[TD_TIMEZONE_LEN]; + char timezone[TD_TIMEZONE_LEN]; int32_t exprLen; int32_t tagsFilterLen; int64_t indexUid; @@ -2371,6 +2371,26 @@ enum { STREAM_NEXT_OP_DST__SND, }; +enum { + STREAM_SOURCE_TYPE__NONE = 1, + STREAM_SOURCE_TYPE__SUPER, + STREAM_SOURCE_TYPE__CHILD, + STREAM_SOURCE_TYPE__NORMAL, +}; + +enum { + STREAM_SINK_TYPE__NONE = 1, + STREAM_SINK_TYPE__INPLACE, + STREAM_SINK_TYPE__ASSIGNED, + STREAM_SINK_TYPE__MULTIPLE, + STREAM_SINK_TYPE__TEMPORARY, +}; + +enum { + STREAM_TYPE__NORMAL = 1, + STREAM_TYPE__SMA, +}; + typedef struct { void* inputHandle; void* executor; @@ -2381,28 +2401,33 @@ typedef struct { int32_t taskId; int32_t level; int8_t status; - int8_t pipeSource; - int8_t pipeSink; - int8_t numOfRunners; int8_t parallelizable; - int8_t nextOpDst; // vnode or snode + + // vnode or snode + int8_t nextOpDst; + + int8_t sourceType; + int8_t sinkType; + + // for sink type assigned + int32_t sinkVgId; SEpSet NextOpEp; - char* qmsg; - // not applied to encoder and decoder + + // executor meta info + char* qmsg; + + // followings are not applied to encoder and decoder + int8_t numOfRunners; SStreamRunner runner[8]; - // void* executor; - // void* stateStore; - // storage handle } SStreamTask; -static FORCE_INLINE SStreamTask* streamTaskNew(int64_t streamId, int32_t level) { +static FORCE_INLINE SStreamTask* streamTaskNew(int64_t streamId) { SStreamTask* pTask = (SStreamTask*)calloc(1, sizeof(SStreamTask)); if (pTask == NULL) { return NULL; } pTask->taskId = tGenIdPI32(); pTask->streamId = streamId; - pTask->level = level; pTask->status = STREAM_TASK_STATUS__RUNNING; pTask->qmsg = NULL; return pTask; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index d05ad06a66666f576d68771d0b3cf7686f85111a..49fe7aa65362d1611ba9d599daf6e082de4a8801 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -505,7 +505,7 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa .sql = (char*)sql, }; tNameExtractFullName(&name, req.name); - strcpy(req.outputTbName, tbName); + strcpy(req.outputSTbName, tbName); int tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req); void* buf = malloc(tlen); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index d2add0bbcc93533fcdda5a2977f5116ca2432d51..e860fc183149567c8b5514d7218857a76fbd0ee0 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -314,7 +314,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]); } - if(pReq->rollup && pReq->stbCfg.pRSmaParam) { + if (pReq->rollup && pReq->stbCfg.pRSmaParam) { SRSmaParam *param = pReq->stbCfg.pRSmaParam; tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor); tlen += taosEncodeFixedI8(buf, param->delayUnit); @@ -341,7 +341,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { for (col_id_t i = 0; i < pReq->ntbCfg.nBSmaCols; ++i) { tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.pBSmaCols[i]); } - if(pReq->rollup && pReq->ntbCfg.pRSmaParam) { + if (pReq->rollup && pReq->ntbCfg.pRSmaParam) { SRSmaParam *param = pReq->ntbCfg.pRSmaParam; tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor); tlen += taosEncodeFixedI8(buf, param->delayUnit); @@ -427,7 +427,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name); } buf = taosDecodeFixedI16(buf, &(pReq->ntbCfg.nBSmaCols)); - if(pReq->ntbCfg.nBSmaCols > 0) { + if (pReq->ntbCfg.nBSmaCols > 0) { pReq->ntbCfg.pBSmaCols = (col_id_t *)malloc(pReq->ntbCfg.nBSmaCols * sizeof(col_id_t)); for (col_id_t i = 0; i < pReq->ntbCfg.nBSmaCols; ++i) { buf = taosDecodeFixedI16(buf, pReq->ntbCfg.pBSmaCols + i); @@ -435,10 +435,10 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { } else { pReq->ntbCfg.pBSmaCols = NULL; } - if(pReq->rollup) { + if (pReq->rollup) { pReq->ntbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam)); SRSmaParam *param = pReq->ntbCfg.pRSmaParam; - buf = taosDecodeFixedU32(buf, (uint32_t*)¶m->xFilesFactor); + buf = taosDecodeFixedU32(buf, (uint32_t *)¶m->xFilesFactor); buf = taosDecodeFixedI8(buf, ¶m->delayUnit); buf = taosDecodeFixedI8(buf, ¶m->nFuncIds); if (param->nFuncIds > 0) { @@ -3045,7 +3045,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS if (tStartEncode(&encoder) < 0) return -1; if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->outputTbName) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->outputSTbName) < 0) return -1; if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; if (tEncodeI32(&encoder, sqlLen) < 0) return -1; if (tEncodeI32(&encoder, astLen) < 0) return -1; @@ -3068,7 +3068,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea if (tStartDecode(&decoder) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; - if (tDecodeCStrTo(&decoder, pReq->outputTbName) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->outputSTbName) < 0) return -1; if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; if (tDecodeI32(&decoder, &sqlLen) < 0) return -1; if (tDecodeI32(&decoder, &astLen) < 0) return -1; @@ -3101,12 +3101,14 @@ int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) { if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->level) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->pipeSource) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->pipeSink) < 0) return -1; if (tEncodeI8(pEncoder, pTask->parallelizable) < 0) return -1; if (tEncodeI8(pEncoder, pTask->nextOpDst) < 0) return -1; - // if (tEncodeI8(pEncoder, pTask->numOfRunners) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->sourceType) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1; + if (pTask->sinkType == STREAM_SINK_TYPE__ASSIGNED) { + if (tEncodeI32(pEncoder, pTask->sinkVgId) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1; + } if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1; /*tEndEncode(pEncoder);*/ return pEncoder->pos; @@ -3118,12 +3120,14 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) { if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->pipeSource) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->pipeSink) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->parallelizable) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->nextOpDst) < 0) return -1; - // if (tDecodeI8(pDecoder, &pTask->numOfRunners) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->sourceType) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1; + if (pTask->sinkType == STREAM_SINK_TYPE__ASSIGNED) { + if (tDecodeI32(pDecoder, &pTask->sinkVgId) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1; + } if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1; /*tEndDecode(pDecoder);*/ return 0; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 6e27f856ca9b7d9b84cc494ac11c94833cdfc428..0d65dacb20601bac3693b71c6f5b0e4e558f640a 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -720,6 +720,7 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN]; + char outputSTbName[TSDB_TABLE_FNAME_LEN]; int64_t createTime; int64_t updateTime; int64_t uid; @@ -735,7 +736,6 @@ typedef struct { char* physicalPlan; SArray* tasks; // SArray> SArray* ColAlias; - char* outputSTbName; } SStreamObj; int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 1bac8d678a650119f464c8720f8901318ef539eb..6fa926d548a1955cdb396b914101c1a5b7cbb113 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -16,6 +16,7 @@ #include "mndDef.h" int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { + int32_t sz = 0; int32_t outputNameSz = 0; if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->db) < 0) return -1; @@ -30,19 +31,18 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1; // TODO encode tasks if (pObj->tasks) { - int32_t sz = taosArrayGetSize(pObj->tasks); - tEncodeI32(pEncoder, sz); - for (int32_t i = 0; i < sz; i++) { - SArray *pArray = taosArrayGet(pObj->tasks, i); - int32_t innerSz = taosArrayGetSize(pArray); - tEncodeI32(pEncoder, innerSz); - for (int32_t j = 0; j < innerSz; j++) { - SStreamTask *pTask = taosArrayGet(pArray, j); - tEncodeSStreamTask(pEncoder, pTask); - } + sz = taosArrayGetSize(pObj->tasks); + } + if (tEncodeI32(pEncoder, sz) < 0) return -1; + + for (int32_t i = 0; i < sz; i++) { + SArray *pArray = taosArrayGet(pObj->tasks, i); + int32_t innerSz = taosArrayGetSize(pArray); + if (tEncodeI32(pEncoder, innerSz) < 0) return -1; + for (int32_t j = 0; j < innerSz; j++) { + SStreamTask *pTask = taosArrayGet(pArray, j); + if (tEncodeSStreamTask(pEncoder, pTask) < 0) return -1; } - } else { - tEncodeI32(pEncoder, 0); } if (pObj->ColAlias != NULL) { @@ -68,6 +68,7 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1; + pObj->tasks = NULL; int32_t sz; if (tDecodeI32(pDecoder, &sz) < 0) return -1; if (sz != 0) { @@ -83,14 +84,14 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { } taosArrayPush(pObj->tasks, pArray); } - } else { - pObj->tasks = NULL; } int32_t outputNameSz; if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1; - pObj->ColAlias = taosArrayInit(outputNameSz, sizeof(void *)); - if (pObj->ColAlias == NULL) { - return -1; + if (outputNameSz != 0) { + pObj->ColAlias = taosArrayInit(outputNameSz, sizeof(void *)); + if (pObj->ColAlias == NULL) { + return -1; + } } for (int32_t i = 0; i < outputNameSz; i++) { char *name; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 8ccfd6be5d4cf170d69ffacec46574f4c5abd5cf..870cbba9793bcbfe50076bd66fca2164d8eb86ca 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -58,7 +58,7 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet action.contLen = tlen; action.msgType = type; if (mndTransAppendRedoAction(pTrans, &action) != 0) { - rpcFreeCont(buf); + free(buf); return -1; } return 0; @@ -131,13 +131,12 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { lastUsedVgId = pVgroup->vgId; pStream->vgNum++; - // send to vnode - SStreamTask* pTask = streamTaskNew(pStream->uid, level); - pTask->pipeSource = 1; - pTask->pipeSink = level == totLevel - 1 ? 1 : 0; + SStreamTask* pTask = streamTaskNew(pStream->uid); + pTask->level = level; + pTask->sourceType = 1; + pTask->sinkType = level == totLevel - 1 ? 1 : 0; pTask->parallelizable = 1; - // TODO: set to if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); @@ -146,13 +145,15 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { taosArrayPush(taskOneLevel, pTask); } } else { - SStreamTask* pTask = streamTaskNew(pStream->uid, level); - pTask->pipeSource = 0; - pTask->pipeSink = level == totLevel - 1 ? 1 : 0; + SStreamTask* pTask = streamTaskNew(pStream->uid); + pTask->level = level; + pTask->sourceType = 0; + pTask->sinkType = level == totLevel - 1 ? 1 : 0; pTask->parallelizable = plan->subplanType == SUBPLAN_TYPE_SCAN; pTask->nextOpDst = STREAM_NEXT_OP_DST__VND; - if (tsStreamSchedV) { + SSnodeObj* pSnode = mndSchedFetchSnode(pMnode); + if (pSnode == NULL || tsStreamSchedV) { ASSERT(lastUsedVgId != 0); SVgObj* pVg = mndAcquireVgroup(pMnode, lastUsedVgId); if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVg) < 0) { @@ -162,24 +163,19 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { } sdbRelease(pSdb, pVg); } else { - SSnodeObj* pSnode = mndSchedFetchSnode(pMnode); - if (pSnode != NULL) { - if (mndAssignTaskToSnode(pMnode, pTrans, pTask, plan, pSnode) < 0) { - sdbRelease(pSdb, pSnode); - qDestroyQueryPlan(pPlan); - return -1; - } - sdbRelease(pMnode->pSdb, pSnode); - } else { - // TODO: assign to one vg - ASSERT(0); + if (mndAssignTaskToSnode(pMnode, pTrans, pTask, plan, pSnode) < 0) { + sdbRelease(pSdb, pSnode); + qDestroyQueryPlan(pPlan); + return -1; } } + sdbRelease(pMnode->pSdb, pSnode); taosArrayPush(taskOneLevel, pTask); } taosArrayPush(pStream->tasks, taskOneLevel); } + qDestroyQueryPlan(pPlan); return 0; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 59bc3f00c02ce004439d743fc1a7481f211ca09a..bb7891ad2d052bb744230035d606523abe29a74f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -272,6 +272,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast if (nodesStringToNode(ast, &pAst) < 0) { return -1; } +#if 1 SArray *names = mndExtractNamesFromAst(pAst); printf("|"); for (int i = 0; i < taosArrayGetSize(names); i++) { @@ -280,6 +281,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast printf("\n=======================================================\n"); pStream->ColAlias = names; +#endif if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, &pStream->physicalPlan)) { mError("topic:%s, failed to get plan since %s", pStream->name, terrstr()); @@ -308,6 +310,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe SStreamObj streamObj = {0}; tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN); tstrncpy(streamObj.db, pDb->name, TSDB_DB_FNAME_LEN); + tstrncpy(streamObj.outputSTbName, pCreate->outputSTbName, TSDB_TABLE_FNAME_LEN); streamObj.createTime = taosGetTimestampMs(); streamObj.updateTime = streamObj.createTime; streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name)); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 76687fc5cc0b9359257845b7087d3ad18ea44aab..c4d389a379d9e71c28d6d0c7b274c9bfe5602ca6 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -358,9 +358,7 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { return 0; } -int32_t mndStart(SMnode *pMnode) { - return mndInitTimer(pMnode); -} +int32_t mndStart(SMnode *pMnode) { return mndInitTimer(pMnode); } int32_t mndProcessMsg(SNodeMsg *pMsg) { SMnode *pMnode = pMsg->pNode; @@ -415,11 +413,11 @@ int64_t mndGenerateUid(char *name, int32_t len) { int32_t hashval = MurmurHash3_32(name, len); do { - int64_t us = taosGetTimestampUs(); + int64_t us = taosGetTimestampUs(); int64_t x = (us & 0x000000FFFFFFFFFF) << 24; int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul)); if (uuid) { - return abs(uuid); + return llabs(uuid); } } while (true); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 21da0649e252ad9aea3a488a355083d5e5dc1590..43554f923ea50658ba823c27c07de1fa9a21ccba 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -561,7 +561,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) { pIter = taosHashIterate(pTq->pStreamTasks, pIter); if (pIter == NULL) break; SStreamTask* pTask = (SStreamTask*)pIter; - if (!pTask->pipeSource) continue; + if (!pTask->sourceType) continue; int32_t workerId = 0; void* exec = pTask->runner[workerId].executor; @@ -578,7 +578,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) { } taosArrayPush(pRes, output); } - if (pTask->pipeSink) { + if (pTask->sinkType) { // write back /*printf("reach end\n");*/ tqDebugShowSSData(pRes);