diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 73ba0f47b8d3003f6f924db86be3b2830444be4d..1b32dc26f5cf34cf0b84da409ee3c62dd0778696 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -46,6 +46,7 @@ typedef enum EStreamType { STREAM_INVALID, STREAM_GET_ALL, STREAM_DELETE, + STREAM_RETRIEVE, } EStreamType; typedef struct { diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index bb5e903d1e6391d2ec6c80216a2bbf9f1b715d73..ba707b8f5e5d983c84560b6600738380eb57ef54 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -200,15 +200,14 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_ALTER_HASHRANGE, "alter-hashrange", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "drop-ttl-stb", NULL, NULL) - TD_NEW_MSG_SEG(TDMT_QND_MSG) - //shared by snode and vnode TD_NEW_MSG_SEG(TDMT_STREAM_MSG) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DROP, "stream-task-drop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RUN, "stream-task-run", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RECOVER, "stream-task-recover", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) TD_NEW_MSG_SEG(TDMT_SCH_MSG) TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index ee599e8498e60a00391b68eaf06f0a467e65c3ae..684a605e18039530a9b79777bbb1443c18f973a7 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -77,7 +77,7 @@ typedef struct { typedef struct { int8_t type; - int32_t sourceVg; + int32_t srcVgId; int64_t sourceVer; SArray* blocks; // SArray @@ -145,11 +145,6 @@ void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit); SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit); -#if 0 -int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput); -void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput); -#endif - typedef struct { char* qmsg; // followings are not applicable to encoder and decoder @@ -234,6 +229,13 @@ enum { TASK_TRIGGER_STATUS__ACTIVE, }; +typedef struct { + int32_t nodeId; + int32_t childId; + int32_t taskId; + SEpSet epSet; +} SStreamChildEpInfo; + struct SStreamTask { int64_t streamId; int32_t taskId; @@ -247,13 +249,16 @@ struct SStreamTask { int8_t dispatchType; int16_t dispatchMsgType; - int8_t dataScan; + int8_t isDataScan; // node info - int32_t childId; + int32_t selfChildId; int32_t nodeId; SEpSet epSet; + // children info + SArray* childEpInfo; // SArray + // exec STaskExec exec; @@ -291,6 +296,9 @@ struct SStreamTask { SMsgCb* pMsgCb; }; +int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); +int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); + SStreamTask* tNewSStreamTask(int64_t streamId); int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask); @@ -369,9 +377,9 @@ typedef struct { typedef struct { int64_t streamId; int32_t taskId; - int32_t sourceTaskId; - int32_t sourceVg; - int32_t sourceChildId; + int32_t dataSrcVgId; + int32_t upstreamTaskId; + int32_t upstreamChildId; int32_t upstreamNodeId; #if 0 int64_t sourceVer; @@ -387,6 +395,23 @@ typedef struct { int8_t inputStatus; } SStreamDispatchRsp; +typedef struct { + int64_t streamId; + int32_t srcTaskId; + int32_t srcNodeId; + int32_t dstTaskId; + int32_t dstNodeId; + int32_t retrieveLen; + SRetrieveTableRsp* pRetrieve; +} SStreamRetrieveReq; + +typedef struct { + int64_t streamId; + int32_t childId; + int32_t rspFromTaskId; + int32_t rspToTaskId; +} SStreamRetrieveRsp; + typedef struct { int64_t streamId; int32_t taskId; @@ -401,6 +426,7 @@ typedef struct { } SStreamTaskRecoverRsp; int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); +int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId); int32_t streamSetupTrigger(SStreamTask* pTask); @@ -411,6 +437,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp); int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg); int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp); +int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); +int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index e1ffc3bdb7f2b9ce509fef20b4a4847ac91689c3..7cb41ca77cb77f7069ab8e375cef64840b058615 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -101,6 +101,8 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index d9502ec8f34d73eab7c0870f15756349e9b167d4..00d83f0ad4d5632e66352014d52cfc56f7bd1bfc 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -359,6 +359,8 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 2b6258b10ab76772e69c7a6935c058a526e202f7..e055e3b2baecb75da527b18559e3e11f1b2cfa50 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -37,7 +37,7 @@ extern bool tsSchedStreamToSnode; static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) { int32_t childId = taosArrayGetSize(pArray); - pTask->childId = childId; + pTask->selfChildId = childId; taosArrayPush(pArray, &pTask); return 0; } @@ -270,6 +270,8 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb pTask->nodeId = pVgroup->vgId; pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); + pTask->isDataScan = 0; + // source pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; @@ -306,6 +308,8 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* } mndAddTaskToTaskSet(tasks, pTask); + ASSERT(pStream->fixedSinkVg.vgId == pStream->fixedSinkVgId); + pTask->nodeId = pStream->fixedSinkVgId; #if 0 SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId); @@ -315,6 +319,9 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); #endif pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg); + + pTask->isDataScan = 0; + // source pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; @@ -384,6 +391,11 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { pInnerTask = tNewSStreamTask(pStream->uid); mndAddTaskToTaskSet(taskInnerLevel, pInnerTask); + + pInnerTask->isDataScan = 0; + + pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*)); + // input pInnerTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; @@ -446,7 +458,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SStreamTask* pTask = tNewSStreamTask(pStream->uid); mndAddTaskToTaskSet(taskSourceLevel, pTask); - pTask->dataScan = 1; + pTask->isDataScan = 1; // input pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK; @@ -467,6 +479,20 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { qDestroyQueryPlan(pPlan); return -1; } + + SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); + if (pEpInfo == NULL) { + ASSERT(0); + terrno = TSDB_CODE_OUT_OF_MEMORY; + sdbRelease(pSdb, pVgroup); + qDestroyQueryPlan(pPlan); + return -1; + } + pEpInfo->childId = pTask->selfChildId; + pEpInfo->epSet = pTask->epSet; + pEpInfo->nodeId = pTask->nodeId; + pEpInfo->taskId = pTask->taskId; + taosArrayPush(pInnerTask->childEpInfo, &pEpInfo); } } @@ -491,7 +517,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SStreamTask* pTask = tNewSStreamTask(pStream->uid); mndAddTaskToTaskSet(taskOneLevel, pTask); - pTask->dataScan = 1; + pTask->isDataScan = 1; // input pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index c2c4ea3a88e1b690cc6b94c9ee15575e8ec004e1..3a92cba7731576658a8fa9edc9402c2c0e0ab068 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -105,13 +105,14 @@ static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) { ASSERT(pTask->execType != TASK_EXEC__NONE); - ASSERT(pTask->dataScan == 0); + ASSERT(pTask->isDataScan == 0); pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL); ASSERT(pTask->exec.executor); streamSetupTrigger(pTask); - qInfo("deploy stream: stream id %ld task id %d child id %d on snode", pTask->streamId, pTask->taskId, pTask->childId); + qInfo("deploy stream: stream id %ld task id %d child id %d on snode", pTask->streamId, pTask->taskId, + pTask->selfChildId); taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void *)); @@ -198,6 +199,34 @@ static int32_t sndProcessTaskDropReq(SSnode *pNode, SRpcMsg *pMsg) { return code; } +static int32_t sndProcessTaskRetrieveReq(SSnode *pNode, SRpcMsg *pMsg) { + SStreamMeta *pMeta = pNode->pMeta; + + char *msgStr = pMsg->pCont; + char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + SStreamRetrieveReq req; + SDecoder decoder; + tDecoderInit(&decoder, msgBody, msgLen); + tDecodeStreamRetrieveReq(&decoder, &req); + int32_t taskId = req.dstTaskId; + SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t)); + if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { + return 0; + } + SRpcMsg rsp = { + .info = pMsg->info, + .code = 0, + }; + streamProcessRetrieveReq(pTask, &req, &rsp); + return 0; +} + +static int32_t sndProcessTaskRetrieveRsp(SSnode *pNode, SRpcMsg *pMsg) { + // + return 0; +} + int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { // stream deploy // stream stop/resume @@ -221,10 +250,14 @@ int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { return sndProcessTaskDispatchReq(pSnode, pMsg); case TDMT_STREAM_TASK_RECOVER: return sndProcessTaskRecoverReq(pSnode, pMsg); + case TDMT_STREAM_RETRIEVE: + return sndProcessTaskRecoverReq(pSnode, pMsg); case TDMT_STREAM_TASK_DISPATCH_RSP: return sndProcessTaskDispatchRsp(pSnode, pMsg); case TDMT_STREAM_TASK_RECOVER_RSP: return sndProcessTaskRecoverRsp(pSnode, pMsg); + case TDMT_STREAM_RETRIEVE_RSP: + return sndProcessTaskRecoverRsp(pSnode, pMsg); default: ASSERT(0); } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index c0dfebb08f566e6a9e28d233c7dc7c8dd1a991ea..f8bdb63c861e23f483ebceaa4ad7459d23b134b6 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -149,6 +149,8 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, const char* stbFullName, int32_t vgId); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3ce02dc50ad61852ec84f0a0cc2da736610c0ff2..691548af7ad6bfd9142ddde87f860ddfc94f72da 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -441,7 +441,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { // exec if (pTask->execType != TASK_EXEC__NONE) { // expand runners - if (pTask->dataScan) { + if (pTask->isDataScan) { STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); SReadHandle handle = { .reader = pStreamReader, @@ -476,7 +476,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { streamSetupTrigger(pTask); - tqInfo("deploy stream task id %d child id %d on vg %d", pTask->taskId, pTask->childId, TD_VID(pTq->pVnode)); + tqInfo("deploy stream task id %d child id %d on vg %d", pTask->taskId, pTask->selfChildId, TD_VID(pTq->pVnode)); taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); @@ -616,3 +616,29 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { return code; #endif } + +int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { + char* msgStr = pMsg->pCont; + char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + SStreamRetrieveReq req; + SDecoder decoder; + tDecoderInit(&decoder, msgBody, msgLen); + tDecodeStreamRetrieveReq(&decoder, &req); + int32_t taskId = req.dstTaskId; + SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { + return 0; + } + SRpcMsg rsp = { + .info = pMsg->info, + .code = 0, + }; + streamProcessRetrieveReq(pTask, &req, &rsp); + return 0; +} + +int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { + // + return 0; +} diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index e764aba48e4188430cfc67c3f2844689b077b089..3dd636e5a8d67b3c0aeccb1ff6056932723e0ca7 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -265,10 +265,14 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return tqProcessTaskDispatchReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RECOVER: return tqProcessTaskRecoverReq(pVnode->pTq, pMsg); + case TDMT_STREAM_RETRIEVE: + return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_DISPATCH_RSP: return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RECOVER_RSP: return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg); + case TDMT_STREAM_RETRIEVE_RSP: + return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in fetch queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 8aaf4953def5d8fcb1fe354124c28f595649d8e1..9654e21d24a3768a69d8452715fd56fac73b2f2d 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -33,8 +33,13 @@ static SStreamGlobalEnv streamEnv; int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); +int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet); +int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock); + +int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); + #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 18a6e5bf77a7e86fb503d115b51c7d17296bb323..9d92865e4780fc5f0f9ea78ab9fe7356e0c0c661 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -112,7 +112,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* // enqueue if (pData != NULL) { pData->type = STREAM_DATA_TYPE_SSDATA_BLOCK; - pData->sourceVg = pReq->sourceVg; + pData->srcVgId = pReq->dataSrcVgId; // decode /*pData->blocks = pReq->data;*/ /*pBlock->sourceVer = pReq->sourceVer;*/ @@ -133,7 +133,42 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead)); pCont->inputStatus = status; pCont->streamId = pReq->streamId; - pCont->taskId = pReq->sourceTaskId; + pCont->taskId = pReq->upstreamTaskId; + pRsp->pCont = buf; + pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); + tmsgSendRsp(pRsp); + return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; +} + +int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { + SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); + int8_t status = TASK_INPUT_STATUS__NORMAL; + + // enqueue + if (pData != NULL) { + pData->type = STREAM_DATA_TYPE_SSDATA_BLOCK; + pData->srcVgId = 0; + // decode + /*pData->blocks = pReq->data;*/ + /*pBlock->sourceVer = pReq->sourceVer;*/ + streamRetrieveReqToData(pReq, pData); + if (streamTaskInput(pTask, (SStreamQueueItem*)pData) == 0) { + status = TASK_INPUT_STATUS__NORMAL; + } else { + status = TASK_INPUT_STATUS__FAILED; + } + } else { + /*streamTaskInputFail(pTask);*/ + /*status = TASK_INPUT_STATUS__FAILED;*/ + } + + // rsp by input status + void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp)); + ((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId); + SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead)); + pCont->streamId = pReq->streamId; + pCont->rspToTaskId = pReq->srcTaskId; + pCont->rspFromTaskId = pReq->dstTaskId; pRsp->pCont = buf; pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); tmsgSendRsp(pRsp); @@ -141,7 +176,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* } int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { - qInfo("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId, pReq->sourceTaskId); + qInfo("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId, pReq->upstreamTaskId); // 1. handle input streamTaskEnqueue(pTask, pReq, pRsp); @@ -208,3 +243,22 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) // return 0; } + +int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { + qInfo("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId); + + streamTaskEnqueueRetrieve(pTask, pReq, pRsp); + + ASSERT(pTask->execType != TASK_EXEC__NONE); + streamExec(pTask, pTask->pMsgCb); + + ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE); + streamDispatch(pTask, pTask->pMsgCb); + + return 0; +} + +int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp) { + // + return 0; +} diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 6699e86b1e6137cd9f7a5f873d207c23a0cc09f9..ef328ecf848de7d0fdde0ab8780c9cefd62943bc 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -15,27 +15,6 @@ #include "streamInc.h" -#if 0 -int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput) { - int32_t tlen = 0; - tlen += taosEncodeFixedI8(buf, pOutput->type); - tlen += taosEncodeFixedI32(buf, pOutput->sourceVg); - tlen += taosEncodeFixedI64(buf, pOutput->sourceVer); - ASSERT(pOutput->type == STREAM_INPUT__DATA_BLOCK); - tlen += tEncodeDataBlocks(buf, pOutput->blocks); - return tlen; -} - -void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput) { - buf = taosDecodeFixedI8(buf, &pInput->type); - buf = taosDecodeFixedI32(buf, &pInput->sourceVg); - buf = taosDecodeFixedI64(buf, &pInput->sourceVer); - ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK); - buf = tDecodeDataBlocks(buf, &pInput->blocks); - return (void*)buf; -} -#endif - int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) { int32_t blockNum = pReq->blockNum; SArray* pArray = taosArrayInit(blockNum, sizeof(SSDataBlock)); @@ -54,8 +33,21 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data); // TODO: refactor pDataBlock->info.type = pRetrieve->streamBlockType; - pDataBlock->info.childId = pReq->sourceChildId; + pDataBlock->info.childId = pReq->upstreamChildId; + } + pData->blocks = pArray; + return 0; +} + +int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) { + SArray* pArray = taosArrayInit(1, sizeof(SSDataBlock)); + if (pArray == NULL) { + return -1; } + taosArraySetSize(pArray, 1); + SRetrieveTableRsp* pRetrieve = pReq->pRetrieve; + SSDataBlock* pBlock = taosArrayGet(pArray, 0); + blockCompressDecode(pBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data); pData->blocks = pArray; return 0; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 1894f697c01d550fd4877d2dc533d527b4d51eb4..993fd3dbd7c5cfd0196063bb409c238b19b5b705 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -19,9 +19,9 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->sourceTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->sourceVg) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->sourceChildId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->dataSrcVgId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum); @@ -40,9 +40,9 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->sourceTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->sourceVg) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->sourceChildId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->dataSrcVgId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1; ASSERT(pReq->blockNum > 0); @@ -62,6 +62,102 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { return 0; } +int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) { + // + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->dstNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->dstTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->srcNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->srcTaskId) < 0) return -1; + if (tEncodeBinary(pEncoder, (const uint8_t*)&pReq->pRetrieve, pReq->retrieveLen) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) { + int32_t tlen = 0; + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->dstNodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->dstTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->srcNodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->srcTaskId) < 0) return -1; + if (tDecodeBinary(pDecoder, (uint8_t**)&pReq->pRetrieve, &pReq->retrieveLen) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + +int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) { + SRetrieveTableRsp* pRetrieve = NULL; + void* buf = NULL; + int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); + + pRetrieve = taosMemoryCalloc(1, dataStrLen); + if (pRetrieve == NULL) return -1; + + pRetrieve->useconds = 0; + pRetrieve->precision = TSDB_DEFAULT_PRECISION; + pRetrieve->compressed = 0; + pRetrieve->completed = 1; + pRetrieve->streamBlockType = pBlock->info.type; + pRetrieve->numOfRows = htonl(pBlock->info.rows); + pRetrieve->numOfCols = htonl(pBlock->info.numOfCols); + + int32_t actualLen = 0; + blockCompressEncode(pBlock, pRetrieve->data, &actualLen, pBlock->info.numOfCols, false); + + SStreamRetrieveReq req = { + .streamId = pTask->streamId, + .srcNodeId = pTask->nodeId, + .srcTaskId = pTask->taskId, + .pRetrieve = pRetrieve, + }; + + int32_t sz = taosArrayGetSize(pTask->childEpInfo); + ASSERT(sz > 0); + for (int32_t i = 0; i < sz; i++) { + SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i); + req.dstNodeId = pEpInfo->nodeId; + req.dstTaskId = pEpInfo->taskId; + int32_t code; + int32_t len; + tEncodeSize(tEncodeStreamRetrieveReq, &req, len, code); + if (code < 0) { + ASSERT(0); + return -1; + } + + buf = rpcMallocCont(sizeof(SMsgHead) + len); + if (buf == NULL) { + goto FAIL; + } + + ((SMsgHead*)buf)->vgId = htonl(pEpInfo->nodeId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + SEncoder encoder; + tEncoderInit(&encoder, abuf, len); + tEncodeStreamRetrieveReq(&encoder, &req); + + SRpcMsg rpcMsg = { + .code = 0, + .msgType = TDMT_STREAM_RETRIEVE, + .pCont = buf, + .contLen = len, + }; + + if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) { + ASSERT(0); + return -1; + } + } + return 0; +FAIL: + if (pRetrieve) taosMemoryFree(pRetrieve); + if (buf) taosMemoryFree(buf); + return -1; +} + static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); void* buf = taosMemoryCalloc(1, dataStrLen); @@ -94,9 +190,9 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM SStreamDispatchReq req = { .streamId = pTask->streamId, - .sourceTaskId = pTask->taskId, - .sourceVg = data->sourceVg, - .sourceChildId = pTask->childId, + .dataSrcVgId = data->srcVgId, + .upstreamTaskId = pTask->taskId, + .upstreamChildId = pTask->selfChildId, .upstreamNodeId = pTask->nodeId, .blockNum = blockNum, }; @@ -147,7 +243,7 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM ASSERT(vgId > 0 || vgId == SNODE_HANDLE); req.taskId = downstreamTaskId; - qInfo("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->childId, + qInfo("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId, downstreamTaskId, vgId); // serialize diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index bab86223bf9270463897eb31602619645d8220df..4d9916e196d0359fad845abba4a7f2abefecb12b 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -13,8 +13,7 @@ * along with this program. If not, see . */ -#include "executor.h" -#include "tstream.h" +#include "streamInc.h" static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) { void* exec = pTask->exec.executor; @@ -46,9 +45,17 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) ASSERT(false); } if (output == NULL) break; + + if (output->info.type == STREAM_RETRIEVE) { + if (streamBroadcastToChildren(pTask, output) < 0) { + // TODO + } + continue; + } + // TODO: do we need free memory? SSDataBlock* outputCopy = createOneDataBlock(output, true); - outputCopy->info.childId = pTask->childId; + outputCopy->info.childId = pTask->selfChildId; taosArrayPush(pRes, outputCopy); } return 0; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 6dfaa4cb74f408eda641958167f6ba45c58dbdd3..8a7c425f79c7402d538ead4dd5b7d938249df0f8 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -30,6 +30,22 @@ SStreamTask* tNewSStreamTask(int64_t streamId) { return pTask; } +int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) { + if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1; + return 0; +} + +int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) { + if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1; + return 0; +} + int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { /*if (tStartEncode(pEncoder) < 0) return -1;*/ if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; @@ -41,12 +57,19 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1; if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->dataScan) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->isDataScan) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->childId) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1; + int32_t epSz = taosArrayGetSize(pTask->childEpInfo); + if (tEncodeI32(pEncoder, epSz) < 0) return -1; + for (int32_t i = 0; i < epSz; i++) { + SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->childEpInfo, i); + if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1; + } + if (pTask->execType != TASK_EXEC__NONE) { if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; } @@ -90,12 +113,22 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1; if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->dataScan) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->isDataScan) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->childId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1; + int32_t epSz; + if (tDecodeI32(pDecoder, &epSz) < 0) return -1; + pTask->childEpInfo = taosArrayInit(epSz, sizeof(void*)); + for (int32_t i = 0; i < epSz; i++) { + SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo)); + if (pInfo == NULL) return -1; + if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) return -1; + taosArrayPush(pTask->childEpInfo, &pInfo); + } + if (pTask->execType != TASK_EXEC__NONE) { if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; }