提交 1b26da1b 编写于 作者: L Liu Jicong

refactor(stream): distributed execution

上级 2677a3c8
......@@ -187,6 +187,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH, "vnode-stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH_WRITE, "vnode-stream-task-dispatch-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RECOVER, "vnode-stream-task-recover", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
......
......@@ -285,12 +285,6 @@ struct SStreamTask {
int8_t inputStatus;
int8_t outputStatus;
#if 0
STaosQueue* inputQ;
STaosQall* inputQAll;
STaosQueue* outputQ;
STaosQall* outputQAll;
#endif
SStreamQueue* inputQueue;
SStreamQueue* outputQueue;
......@@ -371,13 +365,6 @@ typedef struct {
int32_t taskId;
} SStreamTaskRunReq;
typedef struct {
// SMsgHead head;
int64_t streamId;
int64_t version;
SArray* res; // SArray<SSDataBlock>
} SStreamSinkReq;
typedef struct {
int64_t streamId;
int32_t taskId;
......@@ -413,10 +400,6 @@ typedef struct {
int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb);
int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input);
int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input);
int32_t streamDequeueOutput(SStreamTask* pTask, void** output);
int32_t streamTaskRun(SStreamTask* pTask);
int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb);
......
......@@ -216,8 +216,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
// TODO wrap in destroy func
taosArrayDestroy(rsp.blockData);
taosArrayDestroy(rsp.blockDataLen);
taosArrayDestroyP(rsp.blockData, (FDelete)taosMemoryFree);
if (rsp.withSchema) {
taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
......
......@@ -25,6 +25,7 @@ extern "C" {
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data);
#ifdef __cplusplus
}
......
......@@ -32,7 +32,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
if (tEncodeBinary(pEncoder, data, len) < 0) return -1;
}
tEndEncode(pEncoder);
return 0;
return pEncoder->pos;
}
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
......@@ -60,11 +60,150 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
return 0;
}
int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) return -1;
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->useconds = 0;
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
pRetrieve->compressed = 0;
pRetrieve->completed = 1;
pRetrieve->numOfRows = htonl(pBlock->info.rows);
int32_t actualLen = 0;
blockCompressEncode(pBlock, pRetrieve->data, &actualLen, pBlock->info.numOfCols, false);
actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen);
taosArrayPush(pReq->dataLen, &actualLen);
taosArrayPush(pReq->data, &buf);
return 0;
}
int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
void* buf = NULL;
int32_t code = -1;
int32_t blockNum = taosArrayGetSize(data->blocks);
ASSERT(blockNum != 0);
SStreamDispatchReq req = {
.streamId = pTask->streamId,
.data = data,
.sourceTaskId = pTask->taskId,
.sourceVg = data->sourceVg,
.sourceChildId = pTask->childId,
.blockNum = blockNum,
};
req.data = taosArrayInit(blockNum, sizeof(void*));
req.dataLen = taosArrayInit(blockNum, sizeof(int32_t));
if (req.data == NULL || req.dataLen == NULL) {
goto FAIL;
}
for (int32_t i = 0; i < blockNum; i++) {
SSDataBlock* pDataBlock = taosArrayGet(data->blocks, i);
if (streamAddBlockToDispatchMsg(pDataBlock, &req) < 0) {
goto FAIL;
}
}
int32_t vgId = 0;
int32_t downstreamTaskId = 0;
// find ep
if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
vgId = pTask->fixedEpDispatcher.nodeId;
*ppEpSet = &pTask->fixedEpDispatcher.epSet;
downstreamTaskId = pTask->fixedEpDispatcher.taskId;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
// TODO get ctbName
char ctbName[TSDB_TABLE_FNAME_LEN + 22] = {0};
SSDataBlock* pBlock = taosArrayGet(data->blocks, 0);
sprintf(ctbName, "%s:%ld", pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId);
// get vg and ep
// TODO: get hash function by hashMethod
// get groupId, compute hash value
uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));
// get node
// TODO: optimize search process
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t sz = taosArrayGetSize(vgInfo);
for (int32_t i = 0; i < sz; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
vgId = pVgInfo->vgId;
downstreamTaskId = pVgInfo->taskId;
*ppEpSet = &pVgInfo->epSet;
break;
}
}
ASSERT(vgId != 0);
}
req.taskId = downstreamTaskId;
// serialize
int32_t tlen;
tEncodeSize(tEncodeStreamDispatchReq, &req, tlen, code);
if (code < 0) goto FAIL;
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
code = -1;
goto FAIL;
}
((SMsgHead*)buf)->vgId = htonl(pTask->fixedEpDispatcher.nodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamDispatchReq(&encoder, &req)) < 0) {
goto FAIL;
}
tEncoderClear(&encoder);
pMsg->contLen = tlen + sizeof(SMsgHead);
pMsg->pCont = buf;
code = 0;
FAIL:
if (buf) taosMemoryFree(buf);
if (req.data) taosArrayDestroyP(req.data, (FDelete)taosMemoryFree);
if (req.dataLen) taosArrayDestroy(req.dataLen);
return code;
}
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data) {
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
SRpcMsg dispatchMsg = {0};
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, NULL) < 0) {
ASSERT(0);
return -1;
}
int32_t qType;
if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH || pTask->dispatchMsgType == TDMT_SND_TASK_DISPATCH) {
qType = FETCH_QUEUE;
} else if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH_WRITE) {
qType = WRITE_QUEUE;
} else {
ASSERT(0);
}
tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet = NULL;
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) {
ASSERT(0);
return -1;
}
tmsgSendReq(pEpSet, &dispatchMsg);
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
// TODO
ASSERT(0);
}
return 0;
}
......
......@@ -41,12 +41,9 @@ int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) {
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pBlock->blocks);
}
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
ASSERT(queue == pTask->outputQueue);
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
ASSERT(queue == pTask->outputQueue);
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
ASSERT(queue == pTask->outputQueue);
streamDispatch(pTask, pMsgCb, pBlock);
}
streamQueueProcessSuccess(queue);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册