提交 fb596625 编写于 作者: L Liu Jicong

feat(stream): support retrieve

上级 82a1cc8e
......@@ -46,6 +46,7 @@ typedef enum EStreamType {
STREAM_INVALID,
STREAM_GET_ALL,
STREAM_DELETE,
STREAM_RETRIEVE,
} EStreamType;
typedef struct {
......
......@@ -200,15 +200,14 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_HASHRANGE, "alter-hashrange", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "drop-ttl-stb", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_QND_MSG)
//shared by snode and vnode
TD_NEW_MSG_SEG(TDMT_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DROP, "stream-task-drop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RUN, "stream-task-run", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RECOVER, "stream-task-recover", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_SCH_MSG)
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL)
......
......@@ -77,7 +77,7 @@ typedef struct {
typedef struct {
int8_t type;
int32_t sourceVg;
int32_t srcVgId;
int64_t sourceVer;
SArray* blocks; // SArray<SSDataBlock*>
......@@ -145,11 +145,6 @@ void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit);
SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit);
#if 0
int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput);
void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput);
#endif
typedef struct {
char* qmsg;
// followings are not applicable to encoder and decoder
......@@ -234,6 +229,13 @@ enum {
TASK_TRIGGER_STATUS__ACTIVE,
};
typedef struct {
int32_t nodeId;
int32_t childId;
int32_t taskId;
SEpSet epSet;
} SStreamChildEpInfo;
struct SStreamTask {
int64_t streamId;
int32_t taskId;
......@@ -247,13 +249,16 @@ struct SStreamTask {
int8_t dispatchType;
int16_t dispatchMsgType;
int8_t dataScan;
int8_t isDataScan;
// node info
int32_t childId;
int32_t selfChildId;
int32_t nodeId;
SEpSet epSet;
// children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
// exec
STaskExec exec;
......@@ -291,6 +296,9 @@ struct SStreamTask {
SMsgCb* pMsgCb;
};
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
SStreamTask* tNewSStreamTask(int64_t streamId);
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
......@@ -369,9 +377,9 @@ typedef struct {
typedef struct {
int64_t streamId;
int32_t taskId;
int32_t sourceTaskId;
int32_t sourceVg;
int32_t sourceChildId;
int32_t dataSrcVgId;
int32_t upstreamTaskId;
int32_t upstreamChildId;
int32_t upstreamNodeId;
#if 0
int64_t sourceVer;
......@@ -387,6 +395,23 @@ typedef struct {
int8_t inputStatus;
} SStreamDispatchRsp;
typedef struct {
int64_t streamId;
int32_t srcTaskId;
int32_t srcNodeId;
int32_t dstTaskId;
int32_t dstNodeId;
int32_t retrieveLen;
SRetrieveTableRsp* pRetrieve;
} SStreamRetrieveReq;
typedef struct {
int64_t streamId;
int32_t childId;
int32_t rspFromTaskId;
int32_t rspToTaskId;
} SStreamRetrieveRsp;
typedef struct {
int64_t streamId;
int32_t taskId;
......@@ -401,6 +426,7 @@ typedef struct {
} SStreamTaskRecoverRsp;
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId);
int32_t streamSetupTrigger(SStreamTask* pTask);
......@@ -411,6 +437,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp);
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
#ifdef __cplusplus
}
#endif
......
......@@ -101,6 +101,8 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
code = 0;
_OVER:
......
......@@ -359,6 +359,8 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -37,7 +37,7 @@ extern bool tsSchedStreamToSnode;
static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray);
pTask->childId = childId;
pTask->selfChildId = childId;
taosArrayPush(pArray, &pTask);
return 0;
}
......@@ -270,6 +270,8 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb
pTask->nodeId = pVgroup->vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
pTask->isDataScan = 0;
// source
pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
......@@ -306,6 +308,8 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj*
}
mndAddTaskToTaskSet(tasks, pTask);
ASSERT(pStream->fixedSinkVg.vgId == pStream->fixedSinkVgId);
pTask->nodeId = pStream->fixedSinkVgId;
#if 0
SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId);
......@@ -315,6 +319,9 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj*
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
#endif
pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg);
pTask->isDataScan = 0;
// source
pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
......@@ -384,6 +391,11 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pInnerTask = tNewSStreamTask(pStream->uid);
mndAddTaskToTaskSet(taskInnerLevel, pInnerTask);
pInnerTask->isDataScan = 0;
pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*));
// input
pInnerTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
......@@ -446,7 +458,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
mndAddTaskToTaskSet(taskSourceLevel, pTask);
pTask->dataScan = 1;
pTask->isDataScan = 1;
// input
pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK;
......@@ -467,6 +479,20 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
qDestroyQueryPlan(pPlan);
return -1;
}
SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
if (pEpInfo == NULL) {
ASSERT(0);
terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
pEpInfo->childId = pTask->selfChildId;
pEpInfo->epSet = pTask->epSet;
pEpInfo->nodeId = pTask->nodeId;
pEpInfo->taskId = pTask->taskId;
taosArrayPush(pInnerTask->childEpInfo, &pEpInfo);
}
}
......@@ -491,7 +517,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
mndAddTaskToTaskSet(taskOneLevel, pTask);
pTask->dataScan = 1;
pTask->isDataScan = 1;
// input
pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK;
......
......@@ -105,13 +105,14 @@ static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) {
ASSERT(pTask->execType != TASK_EXEC__NONE);
ASSERT(pTask->dataScan == 0);
ASSERT(pTask->isDataScan == 0);
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
ASSERT(pTask->exec.executor);
streamSetupTrigger(pTask);
qInfo("deploy stream: stream id %ld task id %d child id %d on snode", pTask->streamId, pTask->taskId, pTask->childId);
qInfo("deploy stream: stream id %ld task id %d child id %d on snode", pTask->streamId, pTask->taskId,
pTask->selfChildId);
taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void *));
......@@ -198,6 +199,34 @@ static int32_t sndProcessTaskDropReq(SSnode *pNode, SRpcMsg *pMsg) {
return code;
}
static int32_t sndProcessTaskRetrieveReq(SSnode *pNode, SRpcMsg *pMsg) {
SStreamMeta *pMeta = pNode->pMeta;
char *msgStr = pMsg->pCont;
char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamRetrieveReq req;
SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen);
tDecodeStreamRetrieveReq(&decoder, &req);
int32_t taskId = req.dstTaskId;
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
return 0;
}
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
streamProcessRetrieveReq(pTask, &req, &rsp);
return 0;
}
static int32_t sndProcessTaskRetrieveRsp(SSnode *pNode, SRpcMsg *pMsg) {
//
return 0;
}
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
// stream deploy
// stream stop/resume
......@@ -221,10 +250,14 @@ int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
return sndProcessTaskDispatchReq(pSnode, pMsg);
case TDMT_STREAM_TASK_RECOVER:
return sndProcessTaskRecoverReq(pSnode, pMsg);
case TDMT_STREAM_RETRIEVE:
return sndProcessTaskRecoverReq(pSnode, pMsg);
case TDMT_STREAM_TASK_DISPATCH_RSP:
return sndProcessTaskDispatchRsp(pSnode, pMsg);
case TDMT_STREAM_TASK_RECOVER_RSP:
return sndProcessTaskRecoverRsp(pSnode, pMsg);
case TDMT_STREAM_RETRIEVE_RSP:
return sndProcessTaskRecoverRsp(pSnode, pMsg);
default:
ASSERT(0);
}
......
......@@ -149,6 +149,8 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId);
......
......@@ -441,7 +441,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
// exec
if (pTask->execType != TASK_EXEC__NONE) {
// expand runners
if (pTask->dataScan) {
if (pTask->isDataScan) {
STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = {
.reader = pStreamReader,
......@@ -476,7 +476,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
streamSetupTrigger(pTask);
tqInfo("deploy stream task id %d child id %d on vg %d", pTask->taskId, pTask->childId, TD_VID(pTq->pVnode));
tqInfo("deploy stream task id %d child id %d on vg %d", pTask->taskId, pTask->selfChildId, TD_VID(pTq->pVnode));
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
......@@ -616,3 +616,29 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
return code;
#endif
}
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamRetrieveReq req;
SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen);
tDecodeStreamRetrieveReq(&decoder, &req);
int32_t taskId = req.dstTaskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
return 0;
}
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
streamProcessRetrieveReq(pTask, &req, &rsp);
return 0;
}
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
//
return 0;
}
......@@ -265,10 +265,14 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RECOVER:
return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE:
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH_RSP:
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RECOVER_RSP:
return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE_RSP:
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
......
......@@ -33,8 +33,13 @@ static SStreamGlobalEnv streamEnv;
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet);
int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock);
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
#ifdef __cplusplus
}
#endif
......
......@@ -112,7 +112,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
// enqueue
if (pData != NULL) {
pData->type = STREAM_DATA_TYPE_SSDATA_BLOCK;
pData->sourceVg = pReq->sourceVg;
pData->srcVgId = pReq->dataSrcVgId;
// decode
/*pData->blocks = pReq->data;*/
/*pBlock->sourceVer = pReq->sourceVer;*/
......@@ -133,7 +133,42 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
pCont->inputStatus = status;
pCont->streamId = pReq->streamId;
pCont->taskId = pReq->sourceTaskId;
pCont->taskId = pReq->upstreamTaskId;
pRsp->pCont = buf;
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp);
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
}
int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
int8_t status = TASK_INPUT_STATUS__NORMAL;
// enqueue
if (pData != NULL) {
pData->type = STREAM_DATA_TYPE_SSDATA_BLOCK;
pData->srcVgId = 0;
// decode
/*pData->blocks = pReq->data;*/
/*pBlock->sourceVer = pReq->sourceVer;*/
streamRetrieveReqToData(pReq, pData);
if (streamTaskInput(pTask, (SStreamQueueItem*)pData) == 0) {
status = TASK_INPUT_STATUS__NORMAL;
} else {
status = TASK_INPUT_STATUS__FAILED;
}
} else {
/*streamTaskInputFail(pTask);*/
/*status = TASK_INPUT_STATUS__FAILED;*/
}
// rsp by input status
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp));
((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId);
SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
pCont->streamId = pReq->streamId;
pCont->rspToTaskId = pReq->srcTaskId;
pCont->rspFromTaskId = pReq->dstTaskId;
pRsp->pCont = buf;
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp);
......@@ -141,7 +176,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
}
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
qInfo("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId, pReq->sourceTaskId);
qInfo("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId, pReq->upstreamTaskId);
// 1. handle input
streamTaskEnqueue(pTask, pReq, pRsp);
......@@ -208,3 +243,22 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
//
return 0;
}
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
qInfo("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId);
streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
ASSERT(pTask->execType != TASK_EXEC__NONE);
streamExec(pTask, pTask->pMsgCb);
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
streamDispatch(pTask, pTask->pMsgCb);
return 0;
}
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp) {
//
return 0;
}
......@@ -15,27 +15,6 @@
#include "streamInc.h"
#if 0
int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput) {
int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pOutput->type);
tlen += taosEncodeFixedI32(buf, pOutput->sourceVg);
tlen += taosEncodeFixedI64(buf, pOutput->sourceVer);
ASSERT(pOutput->type == STREAM_INPUT__DATA_BLOCK);
tlen += tEncodeDataBlocks(buf, pOutput->blocks);
return tlen;
}
void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput) {
buf = taosDecodeFixedI8(buf, &pInput->type);
buf = taosDecodeFixedI32(buf, &pInput->sourceVg);
buf = taosDecodeFixedI64(buf, &pInput->sourceVer);
ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
buf = tDecodeDataBlocks(buf, &pInput->blocks);
return (void*)buf;
}
#endif
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) {
int32_t blockNum = pReq->blockNum;
SArray* pArray = taosArrayInit(blockNum, sizeof(SSDataBlock));
......@@ -54,8 +33,21 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock
blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
// TODO: refactor
pDataBlock->info.type = pRetrieve->streamBlockType;
pDataBlock->info.childId = pReq->sourceChildId;
pDataBlock->info.childId = pReq->upstreamChildId;
}
pData->blocks = pArray;
return 0;
}
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) {
SArray* pArray = taosArrayInit(1, sizeof(SSDataBlock));
if (pArray == NULL) {
return -1;
}
taosArraySetSize(pArray, 1);
SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
SSDataBlock* pBlock = taosArrayGet(pArray, 0);
blockCompressDecode(pBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
pData->blocks = pArray;
return 0;
}
......
......@@ -19,9 +19,9 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->sourceTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->sourceVg) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->sourceChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->dataSrcVgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
......@@ -40,9 +40,9 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->sourceTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->sourceVg) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->sourceChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->dataSrcVgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
ASSERT(pReq->blockNum > 0);
......@@ -62,6 +62,102 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
return 0;
}
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
//
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->dstNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->dstTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->srcNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->srcTaskId) < 0) return -1;
if (tEncodeBinary(pEncoder, (const uint8_t*)&pReq->pRetrieve, pReq->retrieveLen) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
int32_t tlen = 0;
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->dstNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->dstTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->srcNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->srcTaskId) < 0) return -1;
if (tDecodeBinary(pDecoder, (uint8_t**)&pReq->pRetrieve, &pReq->retrieveLen) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) {
SRetrieveTableRsp* pRetrieve = NULL;
void* buf = NULL;
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
pRetrieve = taosMemoryCalloc(1, dataStrLen);
if (pRetrieve == NULL) return -1;
pRetrieve->useconds = 0;
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
pRetrieve->compressed = 0;
pRetrieve->completed = 1;
pRetrieve->streamBlockType = pBlock->info.type;
pRetrieve->numOfRows = htonl(pBlock->info.rows);
pRetrieve->numOfCols = htonl(pBlock->info.numOfCols);
int32_t actualLen = 0;
blockCompressEncode(pBlock, pRetrieve->data, &actualLen, pBlock->info.numOfCols, false);
SStreamRetrieveReq req = {
.streamId = pTask->streamId,
.srcNodeId = pTask->nodeId,
.srcTaskId = pTask->taskId,
.pRetrieve = pRetrieve,
};
int32_t sz = taosArrayGetSize(pTask->childEpInfo);
ASSERT(sz > 0);
for (int32_t i = 0; i < sz; i++) {
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i);
req.dstNodeId = pEpInfo->nodeId;
req.dstTaskId = pEpInfo->taskId;
int32_t code;
int32_t len;
tEncodeSize(tEncodeStreamRetrieveReq, &req, len, code);
if (code < 0) {
ASSERT(0);
return -1;
}
buf = rpcMallocCont(sizeof(SMsgHead) + len);
if (buf == NULL) {
goto FAIL;
}
((SMsgHead*)buf)->vgId = htonl(pEpInfo->nodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, len);
tEncodeStreamRetrieveReq(&encoder, &req);
SRpcMsg rpcMsg = {
.code = 0,
.msgType = TDMT_STREAM_RETRIEVE,
.pCont = buf,
.contLen = len,
};
if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) {
ASSERT(0);
return -1;
}
}
return 0;
FAIL:
if (pRetrieve) taosMemoryFree(pRetrieve);
if (buf) taosMemoryFree(buf);
return -1;
}
static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
void* buf = taosMemoryCalloc(1, dataStrLen);
......@@ -94,9 +190,9 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
SStreamDispatchReq req = {
.streamId = pTask->streamId,
.sourceTaskId = pTask->taskId,
.sourceVg = data->sourceVg,
.sourceChildId = pTask->childId,
.dataSrcVgId = data->srcVgId,
.upstreamTaskId = pTask->taskId,
.upstreamChildId = pTask->selfChildId,
.upstreamNodeId = pTask->nodeId,
.blockNum = blockNum,
};
......@@ -147,7 +243,7 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
ASSERT(vgId > 0 || vgId == SNODE_HANDLE);
req.taskId = downstreamTaskId;
qInfo("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->childId,
qInfo("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId,
downstreamTaskId, vgId);
// serialize
......
......@@ -13,8 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
#include "tstream.h"
#include "streamInc.h"
static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) {
void* exec = pTask->exec.executor;
......@@ -46,9 +45,17 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
ASSERT(false);
}
if (output == NULL) break;
if (output->info.type == STREAM_RETRIEVE) {
if (streamBroadcastToChildren(pTask, output) < 0) {
// TODO
}
continue;
}
// TODO: do we need free memory?
SSDataBlock* outputCopy = createOneDataBlock(output, true);
outputCopy->info.childId = pTask->childId;
outputCopy->info.childId = pTask->selfChildId;
taosArrayPush(pRes, outputCopy);
}
return 0;
......
......@@ -30,6 +30,22 @@ SStreamTask* tNewSStreamTask(int64_t streamId) {
return pTask;
}
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) {
if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1;
return 0;
}
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1;
return 0;
}
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
/*if (tStartEncode(pEncoder) < 0) return -1;*/
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
......@@ -41,12 +57,19 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1;
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->dataScan) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->isDataScan) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->childId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;
int32_t epSz = taosArrayGetSize(pTask->childEpInfo);
if (tEncodeI32(pEncoder, epSz) < 0) return -1;
for (int32_t i = 0; i < epSz; i++) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->childEpInfo, i);
if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1;
}
if (pTask->execType != TASK_EXEC__NONE) {
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
}
......@@ -90,12 +113,22 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1;
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->dataScan) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->isDataScan) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->childId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;
int32_t epSz;
if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
pTask->childEpInfo = taosArrayInit(epSz, sizeof(void*));
for (int32_t i = 0; i < epSz; i++) {
SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo));
if (pInfo == NULL) return -1;
if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) return -1;
taosArrayPush(pTask->childEpInfo, &pInfo);
}
if (pTask->execType != TASK_EXEC__NONE) {
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册