From 93347bec42e7a0525747658133cf129ff5fc0921 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 23 Mar 2022 10:44:32 +0800 Subject: [PATCH] put next level info into task --- example/src/tstream.c | 4 +-- include/common/tmsg.h | 36 ++++++++++++---------- source/common/src/tglobal.c | 5 ++- source/common/src/tmsg.c | 17 ++++++---- source/dnode/mnode/impl/src/mndScheduler.c | 36 +++++++++++++++++----- source/dnode/vnode/src/inc/tqInt.h | 1 + source/dnode/vnode/src/inc/vnd.h | 4 +-- source/dnode/vnode/src/tq/tq.c | 21 +++++++++++-- source/dnode/vnode/src/vnd/vnodeMain.c | 5 +-- 9 files changed, 90 insertions(+), 39 deletions(-) diff --git a/example/src/tstream.c b/example/src/tstream.c index 62d94b041d..56650634c5 100644 --- a/example/src/tstream.c +++ b/example/src/tstream.c @@ -63,7 +63,7 @@ int32_t init_env() { } int32_t create_stream() { - printf("create topic\n"); + printf("create stream\n"); TAOS_RES* pRes; TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); if (pConn == NULL) { @@ -77,7 +77,7 @@ int32_t create_stream() { } taos_free_result(pRes); - const char* sql = "select ts,k from tu1"; + const char* sql = "select ts,sum(k) from tu1"; pRes = tmq_create_stream(pConn, "stream1", "out1", sql); if (taos_errno(pRes) != 0) { printf("failed to create stream out1, reason:%s\n", taos_errstr(pRes)); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 560490569a..5d01f1cfe7 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -23,8 +23,8 @@ #include "tencode.h" #include "thash.h" #include "tlist.h" -#include "trow.h" #include "tname.h" +#include "trow.h" #include "tuuid.h" #ifdef __cplusplus @@ -472,10 +472,9 @@ typedef struct { int32_t code; } SQueryTableRsp; -int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp); - -int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp); +int32_t tSerializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp); +int32_t tDeserializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp); typedef struct { char db[TSDB_DB_FNAME_LEN]; @@ -888,14 +887,14 @@ typedef struct { } SRetrieveTableRsp; typedef struct { - int64_t handle; - int64_t useconds; - int8_t completed; // all results are returned to client - int8_t precision; - int8_t compressed; - int32_t compLen; - int32_t numOfRows; - char data[]; + int64_t handle; + int64_t useconds; + int8_t completed; // all results are returned to client + int8_t precision; + int8_t compressed; + int32_t compLen; + int32_t numOfRows; + char data[]; } SRetrieveMetaTableRsp; typedef struct { @@ -1405,12 +1404,11 @@ int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq); void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq); typedef struct { - SArray* rspList; // SArray + SArray* rspList; // SArray } SVCreateTbBatchRsp; -int32_t tSerializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp); -int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp); - +int32_t tSerializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp); +int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp); typedef struct { int64_t ver; @@ -2292,6 +2290,11 @@ enum { STREAM_TASK_STATUS__STOP, }; +enum { + STREAM_NEXT_OP_DST__VND = 1, + STREAM_NEXT_OP_DST__SND, +}; + typedef struct { void* inputHandle; void* executor; @@ -2306,6 +2309,7 @@ typedef struct { int8_t pipeSink; int8_t numOfRunners; int8_t parallelizable; + int8_t nextOpDst; // vnode or snode SEpSet NextOpEp; char* qmsg; // not applied to encoder and decoder diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 841824a7c7..506e017deb 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -132,6 +132,9 @@ bool tsdbForceKeepFile = false; int32_t tsDiskCfgNum = 0; SDiskCfg tsDiskCfg[TFS_MAX_DISKS] = {0}; +// stream scheduler +bool tsStreamSchedV = true; + /* * minimum scale for whole system, millisecond by default * for TSDB_TIME_PRECISION_MILLI: 86400000L @@ -585,4 +588,4 @@ void taosCfgDynamicOptions(const char *option, const char *value) { taosResetLog(); cfgDumpCfg(tsCfg, 1, false); } -} \ No newline at end of file +} diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 47872b89d5..e493c651fa 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2629,7 +2629,7 @@ int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeI32(&encoder, pRsp->code) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->code) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -2656,13 +2656,13 @@ int32_t tSerializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchR if (tStartEncode(&encoder) < 0) return -1; if (pRsp->rspList) { int32_t num = taosArrayGetSize(pRsp->rspList); - if (tEncodeI32(&encoder, num) < 0) return -1; + if (tEncodeI32(&encoder, num) < 0) return -1; for (int32_t i = 0; i < num; ++i) { SVCreateTbRsp *rsp = taosArrayGet(pRsp->rspList, i); - if (tEncodeI32(&encoder, rsp->code) < 0) return -1; + if (tEncodeI32(&encoder, rsp->code) < 0) return -1; } } else { - if (tEncodeI32(&encoder, 0) < 0) return -1; + if (tEncodeI32(&encoder, 0) < 0) return -1; } tEndEncode(&encoder); @@ -2672,7 +2672,7 @@ int32_t tSerializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchR } int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp) { - SCoder decoder = {0}; + SCoder decoder = {0}; int32_t num = 0; tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); @@ -2695,7 +2695,6 @@ int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatc return 0; } - int32_t tSerializeSVCreateTSmaReq(void **buf, SVCreateTSmaReq *pReq) { int32_t tlen = 0; @@ -2797,7 +2796,10 @@ int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) { if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->level) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->pipeSource) < 0) return -1; if (tEncodeI8(pEncoder, pTask->pipeSink) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->parallelizable) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->nextOpDst) < 0) return -1; // if (tEncodeI8(pEncoder, pTask->numOfRunners) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1; @@ -2811,7 +2813,10 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) { if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->pipeSource) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->pipeSink) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->parallelizable) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->nextOpDst) < 0) return -1; // if (tDecodeI8(pDecoder, &pTask->numOfRunners) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index b95574ea41..a6d27ffa44 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -32,6 +32,8 @@ #include "tname.h" #include "tuuid.h" +extern bool tsStreamSchedV; + int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type) { SCoder encoder; tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); @@ -106,6 +108,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { int32_t totLevel = LIST_LENGTH(pPlan->pSubplans); pStream->tasks = taosArrayInit(totLevel, sizeof(SArray)); + int32_t lastUsedVgId = 0; for (int32_t level = 0; level < totLevel; level++) { SArray* taskOneLevel = taosArrayInit(0, sizeof(SStreamTask)); @@ -125,11 +128,14 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { continue; } + lastUsedVgId = pVgroup->vgId; pStream->vgNum++; // send to vnode SStreamTask* pTask = streamTaskNew(pStream->uid, level); + pTask->pipeSource = 1; pTask->pipeSink = level == totLevel - 1 ? 1 : 0; + pTask->parallelizable = 1; // TODO: set to if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); @@ -140,19 +146,35 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { } } else { SStreamTask* pTask = streamTaskNew(pStream->uid, level); + pTask->pipeSource = 0; pTask->pipeSink = level == totLevel - 1 ? 1 : 0; - SSnodeObj* pSnode = mndSchedFetchSnode(pMnode); - if (pSnode != NULL) { - if (mndAssignTaskToSnode(pMnode, pTrans, pTask, plan, pSnode) < 0) { - sdbRelease(pSdb, pSnode); + pTask->parallelizable = plan->type == SUBPLAN_TYPE_SCAN; + pTask->nextOpDst = STREAM_NEXT_OP_DST__VND; + + if (tsStreamSchedV) { + ASSERT(lastUsedVgId != 0); + SVgObj* pVg = mndAcquireVgroup(pMnode, lastUsedVgId); + if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVg) < 0) { + sdbRelease(pSdb, pVg); qDestroyQueryPlan(pPlan); return -1; } - sdbRelease(pMnode->pSdb, pSnode); + sdbRelease(pSdb, pVg); } else { - // TODO: assign to one vg - ASSERT(0); + SSnodeObj* pSnode = mndSchedFetchSnode(pMnode); + if (pSnode != NULL) { + if (mndAssignTaskToSnode(pMnode, pTrans, pTask, plan, pSnode) < 0) { + sdbRelease(pSdb, pSnode); + qDestroyQueryPlan(pPlan); + return -1; + } + sdbRelease(pMnode->pSdb, pSnode); + } else { + // TODO: assign to one vg + ASSERT(0); + } } + taosArrayPush(taskOneLevel, pTask); } taosArrayPush(pStream->tasks, taskOneLevel); diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h index 4d4bb12a21..deb3cae617 100644 --- a/source/dnode/vnode/src/inc/tqInt.h +++ b/source/dnode/vnode/src/inc/tqInt.h @@ -167,6 +167,7 @@ struct STQ { STqMetaStore* tqMeta; STqPushMgr* tqPushMgr; SHashObj* pStreamTasks; + SVnode* pVnode; SWal* pWal; SMeta* pVnodeMeta; }; diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index ed9aad9277..bad5b7c6a2 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -177,7 +177,8 @@ int tqInit(); void tqCleanUp(); // open in each vnode -STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac); +STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, + SMemAllocatorFactory* allocFac); void tqClose(STQ*); // required by vnode @@ -188,7 +189,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg); int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg); - int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3af79ca461..0615ea31d4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -21,7 +21,8 @@ int32_t tqInit() { return tqPushMgrInit(); } void tqCleanUp() { tqPushMgrCleanUp(); } -STQ* tqOpen(const char* path, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) { +STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, + SMemAllocatorFactory* allocFac) { STQ* pTq = malloc(sizeof(STQ)); if (pTq == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; @@ -29,6 +30,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, S } pTq->path = strdup(path); pTq->tqConfig = tqConfig; + pTq->pVnode = pVnode; pTq->pWal = pWal; pTq->pVnodeMeta = pVnodeMeta; #if 0 @@ -104,8 +106,21 @@ int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) { } void* abuf = POINTER_SHIFT(buf, sizeof(SStreamExecMsgHead)); tEncodeDataBlocks(abuf, pRes); - // serialize - // to next level + tmsg_t type; + + if (pTask->nextOpDst == STREAM_NEXT_OP_DST__VND) { + type = TDMT_VND_TASK_EXEC; + } else { + type = TDMT_SND_TASK_EXEC; + } + + SRpcMsg msg = { + .pCont = buf, + .contLen = tlen, + .code = 0, + .msgType = type, + }; + /*vnodeSendReq(pTq->pVnode, &pTask->NextOpEp, &msg);*/ } } diff --git a/source/dnode/vnode/src/vnd/vnodeMain.c b/source/dnode/vnode/src/vnd/vnodeMain.c index 86e670d533..70f4117976 100644 --- a/source/dnode/vnode/src/vnd/vnodeMain.c +++ b/source/dnode/vnode/src/vnd/vnodeMain.c @@ -115,7 +115,8 @@ static int vnodeOpenImpl(SVnode *pVnode) { // Open tsdb sprintf(dir, "%s/tsdb", pVnode->path); - pVnode->pTsdb = tsdbOpen(dir, pVnode->vgId, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs); + pVnode->pTsdb = + tsdbOpen(dir, pVnode->vgId, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs); if (pVnode->pTsdb == NULL) { // TODO: handle error return -1; @@ -131,7 +132,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { // Open TQ sprintf(dir, "%s/tq", pVnode->path); - pVnode->pTq = tqOpen(dir, pVnode->pWal, pVnode->pMeta, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode)); + pVnode->pTq = tqOpen(dir, pVnode, pVnode->pWal, pVnode->pMeta, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode)); if (pVnode->pTq == NULL) { // TODO: handle error return -1; -- GitLab