diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index e6ddc1e5b50b5668a6c651011054111333ee75e2..20d743046f62c69349c0a27c6b2fda826ffcecff 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -138,7 +138,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock); size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); void* blockDataDestroy(SSDataBlock* pBlock); -void blockDebugShowData(SArray* dataBlocks); +void blockDebugShowData(const SArray* dataBlocks); #ifdef __cplusplus } diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7200e78a418961100b2145f4859ddb2882cd8fa3..177fe39397b12efdf3ffd221bf69932ffb0d15ac 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -30,16 +30,18 @@ enum { STREAM_TASK_STATUS__STOP, }; +#if 0 // pipe -> fetch/pipe queue // merge -> merge queue // write -> write queue enum { - TASK_SINK_MSG__SND_PIPE = 1, - TASK_SINK_MSG__SND_MERGE, - TASK_SINK_MSG__VND_PIPE, - TASK_SINK_MSG__VND_MERGE, - TASK_SINK_MSG__VND_WRITE, + TASK_DISPATCH_MSG__SND_PIPE = 1, + TASK_DISPATCH_MSG__SND_MERGE, + TASK_DISPATCH_MSG__VND_PIPE, + TASK_DISPATCH_MSG__VND_MERGE, + TASK_DISPATCH_MSG__VND_WRITE, }; +#endif typedef struct { int32_t nodeId; // 0 for snode @@ -93,13 +95,14 @@ typedef struct { enum { TASK_SOURCE__SCAN = 1, - TASK_SOURCE__SINGLE, - TASK_SOURCE__MULTI, + TASK_SOURCE__PIPE, + TASK_SOURCE__MERGE, }; enum { TASK_EXEC__NONE = 1, - TASK_EXEC__EXEC, + TASK_EXEC__PIPE, + TASK_EXEC__MERGE, }; enum { @@ -129,6 +132,9 @@ typedef struct { int16_t dispatchMsgType; int32_t downstreamTaskId; + int32_t nodeId; + SEpSet epSet; + // source preprocess // exec diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index afdd8f155fd747900ff9a7ff2b49746eb12f7110..4fb9bc1d045cf3b4018f874c365ca97cc01eb228 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1380,7 +1380,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { return buf; } -void blockDebugShowData(SArray* dataBlocks) { +void blockDebugShowData(const SArray* dataBlocks) { char pBuf[128]; int32_t sz = taosArrayGetSize(dataBlocks); for (int32_t i = 0; i < sz; i++) { @@ -1398,13 +1398,17 @@ void blockDebugShowData(SArray* dataBlocks) { printf(" %25s |", pBuf); break; case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_UINT: printf(" %15d |", *(int32_t*)var); break; + case TSDB_DATA_TYPE_UINT: + printf(" %15u |", *(uint32_t*)var); + break; case TSDB_DATA_TYPE_BIGINT: - case TSDB_DATA_TYPE_UBIGINT: printf(" %15ld |", *(int64_t*)var); break; + case TSDB_DATA_TYPE_UBIGINT: + printf(" %15lu |", *(uint64_t*)var); + break; } } printf("\n"); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 179a50c4226a635811fe9d6b27755138c9810f0e..a1bef49cc6661705b7f51d400a9d553d45c4a3c9 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -122,7 +122,7 @@ bool tsRetrieveBlockingModel = 0; // last_row(*), first(*), last_row(ts, col1, col2) query, the result fields will be the original column name bool tsKeepOriginalColumnName = 0; -// long query death-lock +// kill long query bool tsDeadLockKillQuery = 0; // tsdb config diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 4595eeadc978f74f85757f136f12e8268a9aa67a..509e9ea0921f5501aefc12d3f8750f581e0b9730 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -66,8 +66,11 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) { int32_t msgLen; + pTask->nodeId = pVgroup->vgId; + pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); + plan->execNode.nodeId = pVgroup->vgId; - plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup); + plan->execNode.epSet = pTask->epSet; if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { terrno = TSDB_CODE_QRY_INVALID_INPUT; @@ -86,8 +89,12 @@ SSnodeObj* mndSchedFetchSnode(SMnode* pMnode) { int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SSnodeObj* pSnode) { int32_t msgLen; - plan->execNode.nodeId = pSnode->id; - plan->execNode.epSet = mndAcquireEpFromSnode(pMnode, pSnode); + + pTask->nodeId = 0; + pTask->epSet = mndAcquireEpFromSnode(pMnode, pSnode); + + plan->execNode.nodeId = 0; + plan->execNode.epSet = pTask->epSet; if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { terrno = TSDB_CODE_QRY_INVALID_INPUT; @@ -97,9 +104,23 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, return 0; } +SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { + void* pIter = NULL; + SVgObj* pVgroup = NULL; + while (1) { + pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); + if (pIter == NULL) break; + if (pVgroup->dbUid != dbUid) { + sdbRelease(pMnode->pSdb, pVgroup); + continue; + } + return pVgroup; + } + return pVgroup; +} + int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SSdb* pSdb = pMnode->pSdb; - SVgObj* pVgroup = NULL; SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); if (pPlan == NULL) { terrno = TSDB_CODE_QRY_INVALID_INPUT; @@ -108,80 +129,144 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ASSERT(pStream->vgNum == 0); int32_t totLevel = LIST_LENGTH(pPlan->pSubplans); - pStream->tasks = taosArrayInit(totLevel, sizeof(SArray)); - int32_t lastUsedVgId = 0; + ASSERT(totLevel <= 2); + pStream->tasks = taosArrayInit(totLevel, sizeof(void*)); - // gather vnodes - // gather snodes - // iterate plan, expand source to vnodes and assign ep to each task - // iterate tasks, assign sink type and sink ep to each task - - for (int32_t revLevel = totLevel - 1; revLevel >= 0; revLevel--) { - int32_t level = totLevel - 1 - revLevel; - SArray* taskOneLevel = taosArrayInit(0, sizeof(SStreamTask)); - SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, revLevel); - int32_t opNum = LIST_LENGTH(inner->pNodeList); - ASSERT(opNum == 1); + for (int32_t level = 0; level < totLevel; level++) { + SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); + SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level); + ASSERT(LIST_LENGTH(inner->pNodeList) == 1); SSubplan* plan = nodesListGetNode(inner->pNodeList, 0); - if (level == 0) { + + // if (level == totLevel - 1 /* or no snode */) { + if (level == totLevel - 1) { + // last level, source, must assign to vnode + // must be scan type ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN); + + // replicate task to each vnode void* pIter = NULL; while (1) { + SVgObj* pVgroup; pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); if (pIter == NULL) break; if (pVgroup->dbUid != pStream->dbUid) { sdbRelease(pSdb, pVgroup); continue; } + SStreamTask* pTask = tNewSStreamTask(pStream->uid); + // source part + pTask->sourceType = TASK_SOURCE__SCAN; - lastUsedVgId = pVgroup->vgId; - pStream->vgNum++; + // sink part + if (level == 0) { + // only for inplace + pTask->sinkType = TASK_SINK__SHOW; + pTask->showSink.reserved = 0; + } else { + pTask->sinkType = TASK_SINK__NONE; + } - SStreamTask* pTask = tNewSStreamTask(pStream->uid); - /*pTask->level = level;*/ - // TODO - /*pTask->sourceType = STREAM_SOURCE__SUPER;*/ - /*pTask->sinkType = level == totLevel - 1 ? 1 : 0;*/ + // dispatch part + if (level == 0) { + pTask->dispatchType = TASK_DISPATCH__NONE; + // if inplace sink, no dispatcher + // if fixed ep, add fixed ep dispatcher + // if shuffle, add shuffle dispatcher + } else { + // add fixed ep dispatcher + int32_t lastLevel = level - 1; + ASSERT(lastLevel == 0); + SArray* pArray = taosArrayGetP(pStream->tasks, lastLevel); + // one merge only + ASSERT(taosArrayGetSize(pArray) == 1); + SStreamTask* lastLevelTask = taosArrayGetP(pArray, lastLevel); + pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC; + pTask->dispatchType = TASK_DISPATCH__FIXED; + + pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId; + pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet; + } + + // exec part + pTask->execType = TASK_EXEC__PIPE; pTask->exec.parallelizable = 1; if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); return -1; } - taosArrayPush(taskOneLevel, pTask); + sdbRelease(pSdb, pVgroup); + taosArrayPush(taskOneLevel, &pTask); } } else { + // merge plan + + // TODO if has snode, assign to snode + + // else, assign to vnode + ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE); SStreamTask* pTask = tNewSStreamTask(pStream->uid); - /*pTask->level = level;*/ - /*pTask->sourceType = STREAM_SOURCE__NONE;*/ - /*pTask->sinkType = level == totLevel - 1 ? 1 : 0;*/ - pTask->exec.parallelizable = plan->subplanType == SUBPLAN_TYPE_SCAN; - - SSnodeObj* pSnode = mndSchedFetchSnode(pMnode); - if (pSnode == NULL || tsStreamSchedV) { - ASSERT(lastUsedVgId != 0); - SVgObj* pVg = mndAcquireVgroup(pMnode, lastUsedVgId); - if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVg) < 0) { - sdbRelease(pSdb, pVg); - qDestroyQueryPlan(pPlan); - return -1; - } - sdbRelease(pSdb, pVg); - } else { - if (mndAssignTaskToSnode(pMnode, pTrans, pTask, plan, pSnode) < 0) { - sdbRelease(pSdb, pSnode); - qDestroyQueryPlan(pPlan); - return -1; - } + + // source part, currently only support multi source + pTask->sourceType = TASK_SOURCE__PIPE; + + // sink part + pTask->sinkType = TASK_SINK__SHOW; + /*pTask->sinkType = TASK_SINK__NONE;*/ + + // dispatch part + pTask->dispatchType = TASK_DISPATCH__SHUFFLE; + pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC; + + // exec part + pTask->execType = TASK_EXEC__MERGE; + pTask->exec.parallelizable = 0; + SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->dbUid); + ASSERT(pVgroup); + if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { + sdbRelease(pSdb, pVgroup); + qDestroyQueryPlan(pPlan); + return -1; + } + sdbRelease(pSdb, pVgroup); + taosArrayPush(taskOneLevel, &pTask); + } + + taosArrayPush(pStream->tasks, &taskOneLevel); + } + + if (totLevel == 2) { + void* pIter = NULL; + while (1) { + SVgObj* pVgroup; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); + if (pIter == NULL) break; + if (pVgroup->dbUid != pStream->dbUid) { + sdbRelease(pSdb, pVgroup); + continue; } - sdbRelease(pMnode->pSdb, pSnode); + SStreamTask* pTask = tNewSStreamTask(pStream->uid); - taosArrayPush(taskOneLevel, pTask); + // source part + pTask->sourceType = TASK_SOURCE__MERGE; + + // sink part + pTask->sinkType = TASK_SINK__SHOW; + + // dispatch part + pTask->dispatchType = TASK_DISPATCH__NONE; + + // exec part + pTask->execType = TASK_EXEC__NONE; + pTask->exec.parallelizable = 0; } - taosArrayPush(pStream->tasks, taskOneLevel); } + + // free memory qDestroyQueryPlan(pPlan); + return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 18074fc5a80c4ed680671964deed5d79da6ecc83..816681bf24004b7e5986170391360c619e055696 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -14,12 +14,11 @@ */ #include "tcompare.h" +#include "tdatablock.h" #include "tqInt.h" #include "tqMetaStore.h" #include "tstream.h" -void blockDebugShowData(SArray* dataBlocks); - int32_t tqInit() { return tqPushMgrInit(); } void tqCleanUp() { tqPushMgrCleanUp(); } @@ -445,18 +444,19 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { if (pTask->execType == TASK_EXEC__NONE) return 0; pTask->exec.numOfRunners = parallel; + pTask->exec.runners = calloc(parallel, sizeof(SStreamRunner)); + if (pTask->exec.runners == NULL) { + return -1; + } for (int32_t i = 0; i < parallel; i++) { STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); SReadHandle handle = { .reader = pReadHandle, .meta = pTq->pVnodeMeta, }; - pTask->exec.runners = calloc(parallel, sizeof(SStreamRunner)); - if (pTask->exec.runners == NULL) { - return -1; - } pTask->exec.runners[i].inputHandle = pReadHandle; pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); + ASSERT(pTask->exec.runners[i].executor); } return 0; } @@ -473,7 +473,10 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { } tCoderClear(&decoder); - tqExpandTask(pTq, pTask, 8); + if (tqExpandTask(pTq, pTask, 4) < 0) { + ASSERT(0); + } + taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask)); return 0; diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 0cc4da2bd13522fe957ea4cd6a8c6297a47521be..e6ac8b240d9163284d5f0e4b5e016fc791b16536 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -22,10 +22,13 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK && pTask->sourceType != TASK_SOURCE__SCAN) return 0; // exec - if (pTask->execType == TASK_EXEC__EXEC) { + if (pTask->execType != TASK_EXEC__NONE) { ASSERT(workId < pTask->exec.numOfRunners); void* exec = pTask->exec.runners[workId].executor; pRes = taosArrayInit(0, sizeof(SSDataBlock)); + if (pRes == NULL) { + return -1; + } if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { qSetStreamInput(exec, input, inputType); while (1) { @@ -79,7 +82,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in } // dispatch - if (pTask->dispatchType != TASK_DISPATCH__NONE) { + if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { SStreamTaskExecReq req = { .streamId = pTask->streamId, .taskId = pTask->taskId, @@ -101,28 +104,54 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in .code = 0, .msgType = pTask->dispatchMsgType, }; - if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { - int32_t qType; - if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) { - qType = FETCH_QUEUE; - } else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC || - pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) { - qType = MERGE_QUEUE; - } else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) { - qType = WRITE_QUEUE; - } else { - ASSERT(0); - } - tmsgPutToQueue(pMsgCb, qType, &dispatchMsg); - } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { - ((SMsgHead*)buf)->vgId = pTask->fixedEpDispatcher.nodeId; - SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet; - tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg); - } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { - // TODO + + int32_t qType; + if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) { + qType = FETCH_QUEUE; + } else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC || + pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) { + qType = MERGE_QUEUE; + } else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) { + qType = WRITE_QUEUE; } else { ASSERT(0); } + tmsgPutToQueue(pMsgCb, qType, &dispatchMsg); + + } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + SStreamTaskExecReq req = { + .streamId = pTask->streamId, + .taskId = pTask->taskId, + .data = pRes, + }; + + int32_t tlen = sizeof(SMsgHead) + tEncodeSStreamTaskExecReq(NULL, &req); + void* buf = rpcMallocCont(tlen); + + if (buf == NULL) { + return -1; + } + + ((SMsgHead*)buf)->vgId = htonl(pTask->fixedEpDispatcher.nodeId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tEncodeSStreamTaskExecReq(&abuf, &req); + + SRpcMsg dispatchMsg = { + .pCont = buf, + .contLen = tlen, + .code = 0, + .msgType = pTask->dispatchMsgType, + }; + + SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet; + + tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg); + + } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + // TODO + + } else { + ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); } return 0; } @@ -168,7 +197,10 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; if (tEncodeI32(pEncoder, pTask->downstreamTaskId) < 0) return -1; - if (pTask->execType == TASK_EXEC__EXEC) { + if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1; + + if (pTask->execType != TASK_EXEC__NONE) { if (tEncodeI8(pEncoder, pTask->exec.parallelizable) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; } @@ -203,7 +235,10 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) { if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->downstreamTaskId) < 0) return -1; - if (pTask->execType == TASK_EXEC__EXEC) { + if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1; + + if (pTask->execType != TASK_EXEC__NONE) { if (tDecodeI8(pDecoder, &pTask->exec.parallelizable) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; }