提交 90687bae 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 d8525123
...@@ -34,7 +34,7 @@ typedef struct { ...@@ -34,7 +34,7 @@ typedef struct {
static SStreamGlobalEnv streamEnv; static SStreamGlobalEnv streamEnv;
int32_t streamDispatch(SStreamTask* pTask); int32_t streamDispatch(SStreamTask* pTask);
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data); int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data);
......
...@@ -121,24 +121,22 @@ int32_t streamSchedExec(SStreamTask* pTask) { ...@@ -121,24 +121,22 @@ int32_t streamSchedExec(SStreamTask* pTask) {
int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
int8_t status;
// enqueue data block int8_t status = 0;
if (pData != NULL) { if (pData == NULL) {
streamTaskInputFail(pTask);
status = TASK_INPUT_STATUS__FAILED;
qDebug("vgId:%d, s-task:%s failed to received dispatch msg, reason: out of memory", pTask->pMeta->vgId, pTask->id.idStr);
} else {
pData->type = STREAM_INPUT__DATA_BLOCK; pData->type = STREAM_INPUT__DATA_BLOCK;
pData->srcVgId = pReq->dataSrcVgId; pData->srcVgId = pReq->dataSrcVgId;
// decode
/*pData->blocks = pReq->data;*/ streamConvertDispatchMsgToData(pReq, pData);
/*pBlock->sourceVer = pReq->sourceVer;*/
streamDispatchReqToData(pReq, pData);
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) { if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) {
status = TASK_INPUT_STATUS__NORMAL; status = TASK_INPUT_STATUS__NORMAL;
} else { // input queue is full, upstream is blocked now } else { // input queue is full, upstream is blocked now
status = TASK_INPUT_STATUS__BLOCKED; status = TASK_INPUT_STATUS__BLOCKED;
} }
} else {
streamTaskInputFail(pTask);
status = TASK_INPUT_STATUS__FAILED;
} }
// rsp by input status // rsp by input status
...@@ -219,7 +217,7 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { ...@@ -219,7 +217,7 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
} }
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
qDebug("s-task:%s receive dispatch msg from taskId:%d(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId, qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId,
pReq->upstreamNodeId); pReq->upstreamNodeId);
// todo add the input queue buffer limitation // todo add the input queue buffer limitation
...@@ -294,16 +292,13 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { ...@@ -294,16 +292,13 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
if (type == STREAM_INPUT__DATA_SUBMIT) { if (type == STREAM_INPUT__DATA_SUBMIT) {
int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
px->submit.msgLen, px->submit.ver, numOfBlocks, size); px->submit.msgLen, px->submit.ver, total, size);
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) { if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) {
qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total,
size); size);
streamDataSubmitDestroy(px); streamDataSubmitDestroy(px);
taosFreeQitem(pItem); taosFreeQitem(pItem);
...@@ -312,22 +307,20 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { ...@@ -312,22 +307,20 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) { type == STREAM_INPUT__REF_DATA_BLOCK) {
int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) {
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total,
size); size);
return -1; return -1;
} }
qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, numOfBlocks); qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, total);
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
} else if (type == STREAM_INPUT__CHECKPOINT) { } else if (type == STREAM_INPUT__CHECKPOINT) {
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
} else if (type == STREAM_INPUT__GET_RES) { } else if (type == STREAM_INPUT__GET_RES) {
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
} }
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "streamInc.h" #include "streamInc.h"
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) { int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) {
int32_t blockNum = pReq->blockNum; int32_t blockNum = pReq->blockNum;
SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum); SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum);
if (pArray == NULL) { if (pArray == NULL) {
...@@ -39,6 +39,7 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock ...@@ -39,6 +39,7 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock
pDataBlock->info.type = pRetrieve->streamBlockType; pDataBlock->info.type = pRetrieve->streamBlockType;
pDataBlock->info.childId = pReq->upstreamChildId; pDataBlock->info.childId = pReq->upstreamChildId;
} }
pData->blocks = pArray; pData->blocks = pArray;
return 0; return 0;
} }
......
...@@ -320,7 +320,7 @@ int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* p ...@@ -320,7 +320,7 @@ int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* p
msg.pCont = buf; msg.pCont = buf;
msg.msgType = pTask->dispatchMsgType; msg.msgType = pTask->dispatchMsgType;
qDebug("dispatch from s-task:%s to taskId:%d vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId); qDebug("dispatch from s-task:%s to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId);
tmsgSendReq(pEpSet, &msg); tmsgSendReq(pEpSet, &msg);
code = 0; code = 0;
......
...@@ -272,6 +272,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -272,6 +272,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
if (streamTaskShouldPause(&pTask->status)) { if (streamTaskShouldPause(&pTask->status)) {
return 0; return 0;
} }
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) { if (qItem == NULL) {
if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) { if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册