未验证 提交 df1967de 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #14120 from taosdata/feature/stream

feat(stream): support retrieve
...@@ -46,6 +46,7 @@ typedef enum EStreamType { ...@@ -46,6 +46,7 @@ typedef enum EStreamType {
STREAM_INVALID, STREAM_INVALID,
STREAM_GET_ALL, STREAM_GET_ALL,
STREAM_DELETE, STREAM_DELETE,
STREAM_RETRIEVE,
} EStreamType; } EStreamType;
typedef struct { typedef struct {
......
...@@ -201,15 +201,14 @@ enum { ...@@ -201,15 +201,14 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_HASHRANGE, "alter-hashrange", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_HASHRANGE, "alter-hashrange", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "drop-ttl-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "drop-ttl-stb", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_QND_MSG)
//shared by snode and vnode
TD_NEW_MSG_SEG(TDMT_STREAM_MSG) TD_NEW_MSG_SEG(TDMT_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DROP, "stream-task-drop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DROP, "stream-task-drop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RUN, "stream-task-run", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RUN, "stream-task-run", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RECOVER, "stream-task-recover", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RECOVER, "stream-task-recover", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_SCH_MSG) TD_NEW_MSG_SEG(TDMT_SCH_MSG)
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL)
......
...@@ -77,7 +77,7 @@ typedef struct { ...@@ -77,7 +77,7 @@ typedef struct {
typedef struct { typedef struct {
int8_t type; int8_t type;
int32_t sourceVg; int32_t srcVgId;
int64_t sourceVer; int64_t sourceVer;
SArray* blocks; // SArray<SSDataBlock*> SArray* blocks; // SArray<SSDataBlock*>
...@@ -145,11 +145,6 @@ void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit); ...@@ -145,11 +145,6 @@ void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit);
SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit); SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit);
#if 0
int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput);
void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput);
#endif
typedef struct { typedef struct {
char* qmsg; char* qmsg;
// followings are not applicable to encoder and decoder // followings are not applicable to encoder and decoder
...@@ -234,6 +229,13 @@ enum { ...@@ -234,6 +229,13 @@ enum {
TASK_TRIGGER_STATUS__ACTIVE, TASK_TRIGGER_STATUS__ACTIVE,
}; };
typedef struct {
int32_t nodeId;
int32_t childId;
int32_t taskId;
SEpSet epSet;
} SStreamChildEpInfo;
struct SStreamTask { struct SStreamTask {
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t taskId;
...@@ -247,13 +249,16 @@ struct SStreamTask { ...@@ -247,13 +249,16 @@ struct SStreamTask {
int8_t dispatchType; int8_t dispatchType;
int16_t dispatchMsgType; int16_t dispatchMsgType;
int8_t dataScan; int8_t isDataScan;
// node info // node info
int32_t childId; int32_t selfChildId;
int32_t nodeId; int32_t nodeId;
SEpSet epSet; SEpSet epSet;
// children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
// exec // exec
STaskExec exec; STaskExec exec;
...@@ -291,6 +296,9 @@ struct SStreamTask { ...@@ -291,6 +296,9 @@ struct SStreamTask {
SMsgCb* pMsgCb; SMsgCb* pMsgCb;
}; };
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
SStreamTask* tNewSStreamTask(int64_t streamId); SStreamTask* tNewSStreamTask(int64_t streamId);
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask); int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
...@@ -369,9 +377,9 @@ typedef struct { ...@@ -369,9 +377,9 @@ typedef struct {
typedef struct { typedef struct {
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t taskId;
int32_t sourceTaskId; int32_t dataSrcVgId;
int32_t sourceVg; int32_t upstreamTaskId;
int32_t sourceChildId; int32_t upstreamChildId;
int32_t upstreamNodeId; int32_t upstreamNodeId;
#if 0 #if 0
int64_t sourceVer; int64_t sourceVer;
...@@ -387,6 +395,23 @@ typedef struct { ...@@ -387,6 +395,23 @@ typedef struct {
int8_t inputStatus; int8_t inputStatus;
} SStreamDispatchRsp; } SStreamDispatchRsp;
typedef struct {
int64_t streamId;
int32_t srcTaskId;
int32_t srcNodeId;
int32_t dstTaskId;
int32_t dstNodeId;
int32_t retrieveLen;
SRetrieveTableRsp* pRetrieve;
} SStreamRetrieveReq;
typedef struct {
int64_t streamId;
int32_t childId;
int32_t rspFromTaskId;
int32_t rspToTaskId;
} SStreamRetrieveRsp;
typedef struct { typedef struct {
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t taskId;
...@@ -401,6 +426,7 @@ typedef struct { ...@@ -401,6 +426,7 @@ typedef struct {
} SStreamTaskRecoverRsp; } SStreamTaskRecoverRsp;
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId); int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId);
int32_t streamSetupTrigger(SStreamTask* pTask); int32_t streamSetupTrigger(SStreamTask* pTask);
...@@ -411,6 +437,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp); ...@@ -411,6 +437,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp);
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg); int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp); int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -101,6 +101,8 @@ SArray *smGetMsgHandles() { ...@@ -101,6 +101,8 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
code = 0; code = 0;
_OVER: _OVER:
......
...@@ -359,6 +359,8 @@ SArray *vmGetMsgHandles() { ...@@ -359,6 +359,8 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -37,7 +37,7 @@ extern bool tsSchedStreamToSnode; ...@@ -37,7 +37,7 @@ extern bool tsSchedStreamToSnode;
static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) { static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray); int32_t childId = taosArrayGetSize(pArray);
pTask->childId = childId; pTask->selfChildId = childId;
taosArrayPush(pArray, &pTask); taosArrayPush(pArray, &pTask);
return 0; return 0;
} }
...@@ -270,6 +270,8 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb ...@@ -270,6 +270,8 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb
pTask->nodeId = pVgroup->vgId; pTask->nodeId = pVgroup->vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
pTask->isDataScan = 0;
// source // source
pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
...@@ -306,6 +308,8 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* ...@@ -306,6 +308,8 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj*
} }
mndAddTaskToTaskSet(tasks, pTask); mndAddTaskToTaskSet(tasks, pTask);
ASSERT(pStream->fixedSinkVg.vgId == pStream->fixedSinkVgId);
pTask->nodeId = pStream->fixedSinkVgId; pTask->nodeId = pStream->fixedSinkVgId;
#if 0 #if 0
SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId); SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId);
...@@ -315,6 +319,9 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* ...@@ -315,6 +319,9 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj*
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
#endif #endif
pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg); pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg);
pTask->isDataScan = 0;
// source // source
pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
...@@ -384,6 +391,11 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -384,6 +391,11 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pInnerTask = tNewSStreamTask(pStream->uid); pInnerTask = tNewSStreamTask(pStream->uid);
mndAddTaskToTaskSet(taskInnerLevel, pInnerTask); mndAddTaskToTaskSet(taskInnerLevel, pInnerTask);
pInnerTask->isDataScan = 0;
pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*));
// input // input
pInnerTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; pInnerTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
...@@ -446,7 +458,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -446,7 +458,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SStreamTask* pTask = tNewSStreamTask(pStream->uid); SStreamTask* pTask = tNewSStreamTask(pStream->uid);
mndAddTaskToTaskSet(taskSourceLevel, pTask); mndAddTaskToTaskSet(taskSourceLevel, pTask);
pTask->dataScan = 1; pTask->isDataScan = 1;
// input // input
pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK; pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK;
...@@ -467,6 +479,20 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -467,6 +479,20 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
return -1; return -1;
} }
SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
if (pEpInfo == NULL) {
ASSERT(0);
terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
pEpInfo->childId = pTask->selfChildId;
pEpInfo->epSet = pTask->epSet;
pEpInfo->nodeId = pTask->nodeId;
pEpInfo->taskId = pTask->taskId;
taosArrayPush(pInnerTask->childEpInfo, &pEpInfo);
} }
} }
...@@ -491,7 +517,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -491,7 +517,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SStreamTask* pTask = tNewSStreamTask(pStream->uid); SStreamTask* pTask = tNewSStreamTask(pStream->uid);
mndAddTaskToTaskSet(taskOneLevel, pTask); mndAddTaskToTaskSet(taskOneLevel, pTask);
pTask->dataScan = 1; pTask->isDataScan = 1;
// input // input
pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK; pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK;
......
...@@ -105,13 +105,14 @@ static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) { ...@@ -105,13 +105,14 @@ static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) {
ASSERT(pTask->execType != TASK_EXEC__NONE); ASSERT(pTask->execType != TASK_EXEC__NONE);
ASSERT(pTask->dataScan == 0); ASSERT(pTask->isDataScan == 0);
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL); pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
ASSERT(pTask->exec.executor); ASSERT(pTask->exec.executor);
streamSetupTrigger(pTask); streamSetupTrigger(pTask);
qInfo("deploy stream: stream id %ld task id %d child id %d on snode", pTask->streamId, pTask->taskId, pTask->childId); qInfo("deploy stream: stream id %ld task id %d child id %d on snode", pTask->streamId, pTask->taskId,
pTask->selfChildId);
taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void *)); taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void *));
...@@ -198,6 +199,34 @@ static int32_t sndProcessTaskDropReq(SSnode *pNode, SRpcMsg *pMsg) { ...@@ -198,6 +199,34 @@ static int32_t sndProcessTaskDropReq(SSnode *pNode, SRpcMsg *pMsg) {
return code; return code;
} }
static int32_t sndProcessTaskRetrieveReq(SSnode *pNode, SRpcMsg *pMsg) {
SStreamMeta *pMeta = pNode->pMeta;
char *msgStr = pMsg->pCont;
char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamRetrieveReq req;
SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen);
tDecodeStreamRetrieveReq(&decoder, &req);
int32_t taskId = req.dstTaskId;
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
return 0;
}
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
streamProcessRetrieveReq(pTask, &req, &rsp);
return 0;
}
static int32_t sndProcessTaskRetrieveRsp(SSnode *pNode, SRpcMsg *pMsg) {
//
return 0;
}
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
// stream deploy // stream deploy
// stream stop/resume // stream stop/resume
...@@ -221,10 +250,14 @@ int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -221,10 +250,14 @@ int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
return sndProcessTaskDispatchReq(pSnode, pMsg); return sndProcessTaskDispatchReq(pSnode, pMsg);
case TDMT_STREAM_TASK_RECOVER: case TDMT_STREAM_TASK_RECOVER:
return sndProcessTaskRecoverReq(pSnode, pMsg); return sndProcessTaskRecoverReq(pSnode, pMsg);
case TDMT_STREAM_RETRIEVE:
return sndProcessTaskRecoverReq(pSnode, pMsg);
case TDMT_STREAM_TASK_DISPATCH_RSP: case TDMT_STREAM_TASK_DISPATCH_RSP:
return sndProcessTaskDispatchRsp(pSnode, pMsg); return sndProcessTaskDispatchRsp(pSnode, pMsg);
case TDMT_STREAM_TASK_RECOVER_RSP: case TDMT_STREAM_TASK_RECOVER_RSP:
return sndProcessTaskRecoverRsp(pSnode, pMsg); return sndProcessTaskRecoverRsp(pSnode, pMsg);
case TDMT_STREAM_RETRIEVE_RSP:
return sndProcessTaskRecoverRsp(pSnode, pMsg);
default: default:
ASSERT(0); ASSERT(0);
} }
......
...@@ -149,6 +149,8 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg); ...@@ -149,6 +149,8 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId); const char* stbFullName, int32_t vgId);
......
...@@ -441,7 +441,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -441,7 +441,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
// exec // exec
if (pTask->execType != TASK_EXEC__NONE) { if (pTask->execType != TASK_EXEC__NONE) {
// expand runners // expand runners
if (pTask->dataScan) { if (pTask->isDataScan) {
STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = { SReadHandle handle = {
.reader = pStreamReader, .reader = pStreamReader,
...@@ -476,7 +476,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -476,7 +476,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
streamSetupTrigger(pTask); streamSetupTrigger(pTask);
tqInfo("deploy stream task id %d child id %d on vg %d", pTask->taskId, pTask->childId, TD_VID(pTq->pVnode)); tqInfo("deploy stream task id %d child id %d on vg %d", pTask->taskId, pTask->selfChildId, TD_VID(pTq->pVnode));
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
...@@ -616,3 +616,29 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -616,3 +616,29 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
return code; return code;
#endif #endif
} }
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamRetrieveReq req;
SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen);
tDecodeStreamRetrieveReq(&decoder, &req);
int32_t taskId = req.dstTaskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
return 0;
}
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
streamProcessRetrieveReq(pTask, &req, &rsp);
return 0;
}
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
//
return 0;
}
...@@ -265,10 +265,14 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -265,10 +265,14 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg); return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RECOVER: case TDMT_STREAM_TASK_RECOVER:
return tqProcessTaskRecoverReq(pVnode->pTq, pMsg); return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE:
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH_RSP: case TDMT_STREAM_TASK_DISPATCH_RSP:
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RECOVER_RSP: case TDMT_STREAM_TASK_RECOVER_RSP:
return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg); return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE_RSP:
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
default: default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType); vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR; return TSDB_CODE_VND_APP_ERROR;
......
...@@ -33,8 +33,13 @@ static SStreamGlobalEnv streamEnv; ...@@ -33,8 +33,13 @@ static SStreamGlobalEnv streamEnv;
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet); int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet);
int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock);
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -112,7 +112,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* ...@@ -112,7 +112,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
// enqueue // enqueue
if (pData != NULL) { if (pData != NULL) {
pData->type = STREAM_DATA_TYPE_SSDATA_BLOCK; pData->type = STREAM_DATA_TYPE_SSDATA_BLOCK;
pData->sourceVg = pReq->sourceVg; pData->srcVgId = pReq->dataSrcVgId;
// decode // decode
/*pData->blocks = pReq->data;*/ /*pData->blocks = pReq->data;*/
/*pBlock->sourceVer = pReq->sourceVer;*/ /*pBlock->sourceVer = pReq->sourceVer;*/
...@@ -133,7 +133,42 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* ...@@ -133,7 +133,42 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead)); SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
pCont->inputStatus = status; pCont->inputStatus = status;
pCont->streamId = pReq->streamId; pCont->streamId = pReq->streamId;
pCont->taskId = pReq->sourceTaskId; pCont->taskId = pReq->upstreamTaskId;
pRsp->pCont = buf;
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp);
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
}
int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
int8_t status = TASK_INPUT_STATUS__NORMAL;
// enqueue
if (pData != NULL) {
pData->type = STREAM_DATA_TYPE_SSDATA_BLOCK;
pData->srcVgId = 0;
// decode
/*pData->blocks = pReq->data;*/
/*pBlock->sourceVer = pReq->sourceVer;*/
streamRetrieveReqToData(pReq, pData);
if (streamTaskInput(pTask, (SStreamQueueItem*)pData) == 0) {
status = TASK_INPUT_STATUS__NORMAL;
} else {
status = TASK_INPUT_STATUS__FAILED;
}
} else {
/*streamTaskInputFail(pTask);*/
/*status = TASK_INPUT_STATUS__FAILED;*/
}
// rsp by input status
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp));
((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId);
SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
pCont->streamId = pReq->streamId;
pCont->rspToTaskId = pReq->srcTaskId;
pCont->rspFromTaskId = pReq->dstTaskId;
pRsp->pCont = buf; pRsp->pCont = buf;
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp); tmsgSendRsp(pRsp);
...@@ -141,7 +176,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* ...@@ -141,7 +176,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
} }
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
qInfo("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId, pReq->sourceTaskId); qInfo("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId, pReq->upstreamTaskId);
// 1. handle input // 1. handle input
streamTaskEnqueue(pTask, pReq, pRsp); streamTaskEnqueue(pTask, pReq, pRsp);
...@@ -208,3 +243,22 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) ...@@ -208,3 +243,22 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
// //
return 0; return 0;
} }
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
qInfo("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId);
streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
ASSERT(pTask->execType != TASK_EXEC__NONE);
streamExec(pTask, pTask->pMsgCb);
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
streamDispatch(pTask, pTask->pMsgCb);
return 0;
}
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp) {
//
return 0;
}
...@@ -15,27 +15,6 @@ ...@@ -15,27 +15,6 @@
#include "streamInc.h" #include "streamInc.h"
#if 0
int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput) {
int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pOutput->type);
tlen += taosEncodeFixedI32(buf, pOutput->sourceVg);
tlen += taosEncodeFixedI64(buf, pOutput->sourceVer);
ASSERT(pOutput->type == STREAM_INPUT__DATA_BLOCK);
tlen += tEncodeDataBlocks(buf, pOutput->blocks);
return tlen;
}
void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput) {
buf = taosDecodeFixedI8(buf, &pInput->type);
buf = taosDecodeFixedI32(buf, &pInput->sourceVg);
buf = taosDecodeFixedI64(buf, &pInput->sourceVer);
ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
buf = tDecodeDataBlocks(buf, &pInput->blocks);
return (void*)buf;
}
#endif
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) { int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) {
int32_t blockNum = pReq->blockNum; int32_t blockNum = pReq->blockNum;
SArray* pArray = taosArrayInit(blockNum, sizeof(SSDataBlock)); SArray* pArray = taosArrayInit(blockNum, sizeof(SSDataBlock));
...@@ -54,8 +33,21 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock ...@@ -54,8 +33,21 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock
blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data); blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
// TODO: refactor // TODO: refactor
pDataBlock->info.type = pRetrieve->streamBlockType; pDataBlock->info.type = pRetrieve->streamBlockType;
pDataBlock->info.childId = pReq->sourceChildId; pDataBlock->info.childId = pReq->upstreamChildId;
}
pData->blocks = pArray;
return 0;
}
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) {
SArray* pArray = taosArrayInit(1, sizeof(SSDataBlock));
if (pArray == NULL) {
return -1;
} }
taosArraySetSize(pArray, 1);
SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
SSDataBlock* pBlock = taosArrayGet(pArray, 0);
blockCompressDecode(pBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
pData->blocks = pArray; pData->blocks = pArray;
return 0; return 0;
} }
......
...@@ -19,9 +19,9 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p ...@@ -19,9 +19,9 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->sourceTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->sourceVg) < 0) return -1; if (tEncodeI32(pEncoder, pReq->dataSrcVgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->sourceChildId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum); ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
...@@ -40,9 +40,9 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { ...@@ -40,9 +40,9 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1; if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->sourceTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->sourceVg) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->dataSrcVgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->sourceChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
ASSERT(pReq->blockNum > 0); ASSERT(pReq->blockNum > 0);
...@@ -62,6 +62,102 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { ...@@ -62,6 +62,102 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
return 0; return 0;
} }
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
//
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->dstNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->dstTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->srcNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->srcTaskId) < 0) return -1;
if (tEncodeBinary(pEncoder, (const uint8_t*)&pReq->pRetrieve, pReq->retrieveLen) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
int32_t tlen = 0;
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->dstNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->dstTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->srcNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->srcTaskId) < 0) return -1;
if (tDecodeBinary(pDecoder, (uint8_t**)&pReq->pRetrieve, &pReq->retrieveLen) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) {
SRetrieveTableRsp* pRetrieve = NULL;
void* buf = NULL;
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
pRetrieve = taosMemoryCalloc(1, dataStrLen);
if (pRetrieve == NULL) return -1;
pRetrieve->useconds = 0;
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
pRetrieve->compressed = 0;
pRetrieve->completed = 1;
pRetrieve->streamBlockType = pBlock->info.type;
pRetrieve->numOfRows = htonl(pBlock->info.rows);
pRetrieve->numOfCols = htonl(pBlock->info.numOfCols);
int32_t actualLen = 0;
blockCompressEncode(pBlock, pRetrieve->data, &actualLen, pBlock->info.numOfCols, false);
SStreamRetrieveReq req = {
.streamId = pTask->streamId,
.srcNodeId = pTask->nodeId,
.srcTaskId = pTask->taskId,
.pRetrieve = pRetrieve,
};
int32_t sz = taosArrayGetSize(pTask->childEpInfo);
ASSERT(sz > 0);
for (int32_t i = 0; i < sz; i++) {
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i);
req.dstNodeId = pEpInfo->nodeId;
req.dstTaskId = pEpInfo->taskId;
int32_t code;
int32_t len;
tEncodeSize(tEncodeStreamRetrieveReq, &req, len, code);
if (code < 0) {
ASSERT(0);
return -1;
}
buf = rpcMallocCont(sizeof(SMsgHead) + len);
if (buf == NULL) {
goto FAIL;
}
((SMsgHead*)buf)->vgId = htonl(pEpInfo->nodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, len);
tEncodeStreamRetrieveReq(&encoder, &req);
SRpcMsg rpcMsg = {
.code = 0,
.msgType = TDMT_STREAM_RETRIEVE,
.pCont = buf,
.contLen = len,
};
if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) {
ASSERT(0);
return -1;
}
}
return 0;
FAIL:
if (pRetrieve) taosMemoryFree(pRetrieve);
if (buf) taosMemoryFree(buf);
return -1;
}
static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
void* buf = taosMemoryCalloc(1, dataStrLen); void* buf = taosMemoryCalloc(1, dataStrLen);
...@@ -94,9 +190,9 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM ...@@ -94,9 +190,9 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
SStreamDispatchReq req = { SStreamDispatchReq req = {
.streamId = pTask->streamId, .streamId = pTask->streamId,
.sourceTaskId = pTask->taskId, .dataSrcVgId = data->srcVgId,
.sourceVg = data->sourceVg, .upstreamTaskId = pTask->taskId,
.sourceChildId = pTask->childId, .upstreamChildId = pTask->selfChildId,
.upstreamNodeId = pTask->nodeId, .upstreamNodeId = pTask->nodeId,
.blockNum = blockNum, .blockNum = blockNum,
}; };
...@@ -147,7 +243,7 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM ...@@ -147,7 +243,7 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
ASSERT(vgId > 0 || vgId == SNODE_HANDLE); ASSERT(vgId > 0 || vgId == SNODE_HANDLE);
req.taskId = downstreamTaskId; req.taskId = downstreamTaskId;
qInfo("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->childId, qInfo("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId,
downstreamTaskId, vgId); downstreamTaskId, vgId);
// serialize // serialize
......
...@@ -13,8 +13,7 @@ ...@@ -13,8 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "executor.h" #include "streamInc.h"
#include "tstream.h"
static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) { static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) {
void* exec = pTask->exec.executor; void* exec = pTask->exec.executor;
...@@ -46,9 +45,17 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) ...@@ -46,9 +45,17 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
ASSERT(false); ASSERT(false);
} }
if (output == NULL) break; if (output == NULL) break;
if (output->info.type == STREAM_RETRIEVE) {
if (streamBroadcastToChildren(pTask, output) < 0) {
// TODO
}
continue;
}
// TODO: do we need free memory? // TODO: do we need free memory?
SSDataBlock* outputCopy = createOneDataBlock(output, true); SSDataBlock* outputCopy = createOneDataBlock(output, true);
outputCopy->info.childId = pTask->childId; outputCopy->info.childId = pTask->selfChildId;
taosArrayPush(pRes, outputCopy); taosArrayPush(pRes, outputCopy);
} }
return 0; return 0;
......
...@@ -30,6 +30,22 @@ SStreamTask* tNewSStreamTask(int64_t streamId) { ...@@ -30,6 +30,22 @@ SStreamTask* tNewSStreamTask(int64_t streamId) {
return pTask; return pTask;
} }
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) {
if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1;
return 0;
}
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1;
return 0;
}
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
/*if (tStartEncode(pEncoder) < 0) return -1;*/ /*if (tStartEncode(pEncoder) < 0) return -1;*/
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
...@@ -41,12 +57,19 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { ...@@ -41,12 +57,19 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1;
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->dataScan) < 0) return -1; if (tEncodeI8(pEncoder, pTask->isDataScan) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->childId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;
int32_t epSz = taosArrayGetSize(pTask->childEpInfo);
if (tEncodeI32(pEncoder, epSz) < 0) return -1;
for (int32_t i = 0; i < epSz; i++) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->childEpInfo, i);
if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1;
}
if (pTask->execType != TASK_EXEC__NONE) { if (pTask->execType != TASK_EXEC__NONE) {
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
} }
...@@ -90,12 +113,22 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { ...@@ -90,12 +113,22 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1;
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->dataScan) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->isDataScan) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->childId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;
int32_t epSz;
if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
pTask->childEpInfo = taosArrayInit(epSz, sizeof(void*));
for (int32_t i = 0; i < epSz; i++) {
SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo));
if (pInfo == NULL) return -1;
if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) return -1;
taosArrayPush(pTask->childEpInfo, &pInfo);
}
if (pTask->execType != TASK_EXEC__NONE) { if (pTask->execType != TASK_EXEC__NONE) {
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册