提交 de44c916 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 3cef1e7c
......@@ -372,6 +372,7 @@ typedef struct {
int32_t upstreamChildId;
int32_t upstreamNodeId;
int32_t blockNum;
int64_t totalLen;
SArray* dataLen; // SArray<int32_t>
SArray* data; // SArray<SRetrieveTableRsp*>
} SStreamDispatchReq;
......@@ -527,7 +528,7 @@ void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
int32_t streamSetupTrigger(SStreamTask* pTask);
int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
......
......@@ -43,7 +43,7 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
.info = pMsg->info,
.code = 0,
};
streamProcessDispatchReq(pTask, &req, &rsp, false);
streamProcessDispatchMsg(pTask, &req, &rsp, false);
streamMetaReleaseTask(pSnode->pMeta, pTask);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
......@@ -203,17 +203,13 @@ int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) {
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
if (pTask) {
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
streamProcessDispatchReq(pTask, &req, &rsp, exec);
SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
streamProcessDispatchMsg(pTask, &req, &rsp, exec);
streamMetaReleaseTask(pSnode->pMeta, pTask);
return 0;
} else {
return -1;
}
return 0;
}
int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
......
......@@ -853,14 +853,17 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
}
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamTaskCheckReq req;
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeSStreamTaskCheckReq(&decoder, &req);
tDecoderClear(&decoder);
int32_t taskId = req.downstreamTaskId;
SStreamTaskCheckRsp rsp = {
.reqId = req.reqId,
......@@ -874,18 +877,18 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask) {
if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqDebug("tq recv task check req(reqId:0x%" PRIx64
tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64
") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d",
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus, rsp.upstreamTaskId,
rsp.upstreamNodeId, rsp.status);
pTask->id.idStr, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus,
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else {
rsp.status = 0;
tqDebug("tq recv task check(taskId:%d not built yet) req(reqId:0x%" PRIx64
") %d at node %d, check req from task %d at node %d, rsp status %d",
tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64
") %d at node %d, check req from task:0x%x at node %d, rsp status %d",
taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId,
rsp.status);
}
......@@ -893,9 +896,10 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
SEncoder encoder;
int32_t code;
int32_t len;
tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code);
if (code < 0) {
tqError("unable to encode rsp %d", __LINE__);
tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId);
return -1;
}
......@@ -908,6 +912,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
tEncoderClear(&encoder);
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info};
tmsgSendRsp(&rspMsg);
return 0;
}
......@@ -919,17 +924,20 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp);
if (code < 0) {
tDecoderClear(&decoder);
return -1;
}
tDecoderClear(&decoder);
tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d",
tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task:0x%x at node %d, status %d",
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
if (pTask == NULL) {
tqError("tq failed to locate the stream task:0x%x vgId:%d, it may have been destroyed", rsp.upstreamTaskId,
pTq->pStreamMeta->vgId);
return -1;
}
......@@ -1230,15 +1238,17 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamDispatchReq req;
SDecoder decoder;
SStreamDispatchReq req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamDispatchReq(&decoder, &req);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
if (pTask) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamProcessDispatchReq(pTask, &req, &rsp, exec);
streamProcessDispatchMsg(pTask, &req, &rsp, exec);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
} else {
......@@ -1357,7 +1367,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamProcessDispatchReq(pTask, &req, &rsp, false);
streamProcessDispatchMsg(pTask, &req, &rsp, false);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
......
......@@ -551,9 +551,9 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
// start to restore all stream tasks
if (tsDisableStream) {
vInfo("vgId:%d, not restore stream tasks, since disabled", pVnode->config.vgId);
vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", pVnode->config.vgId);
} else {
vInfo("vgId:%d start to restore stream tasks", pVnode->config.vgId);
vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId);
tqStartStreamTasks(pVnode->pTq);
}
}
......
......@@ -33,9 +33,12 @@ typedef struct {
static SStreamGlobalEnv streamEnv;
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes);
void destroyStreamDataBlock(SStreamDataBlock* pBlock);
int32_t streamDispatch(SStreamTask* pTask);
int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data);
......
......@@ -120,19 +120,16 @@ int32_t streamSchedExec(SStreamTask* pTask) {
}
int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
int8_t status = 0;
if (pData == NULL) {
SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->dataSrcVgId);
if (pBlock == NULL) {
streamTaskInputFail(pTask);
status = TASK_INPUT_STATUS__FAILED;
qDebug("vgId:%d, s-task:%s failed to received dispatch msg, reason: out of memory", pTask->pMeta->vgId, pTask->id.idStr);
qDebug("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
pTask->id.idStr);
} else {
pData->type = STREAM_INPUT__DATA_BLOCK;
pData->srcVgId = pReq->dataSrcVgId;
streamConvertDispatchMsgToData(pReq, pData);
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) {
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock) == 0) {
status = TASK_INPUT_STATUS__NORMAL;
} else { // input queue is full, upstream is blocked now
status = TASK_INPUT_STATUS__BLOCKED;
......@@ -142,15 +139,16 @@ int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pR
// rsp by input status
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
pCont->inputStatus = status;
pCont->streamId = htobe64(pReq->streamId);
pCont->upstreamNodeId = htonl(pReq->upstreamNodeId);
pCont->upstreamTaskId = htonl(pReq->upstreamTaskId);
pCont->downstreamNodeId = htonl(pTask->nodeId);
pCont->downstreamTaskId = htonl(pTask->id.taskId);
pRsp->pCont = buf;
SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT(buf, sizeof(SMsgHead));
pDispatchRsp->inputStatus = status;
pDispatchRsp->streamId = htobe64(pReq->streamId);
pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId);
pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId);
pDispatchRsp->downstreamNodeId = htonl(pTask->nodeId);
pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId);
pRsp->pCont = buf;
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp);
......@@ -211,15 +209,15 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
return code;
}
streamDispatch(pTask);
streamDispatchStreamBlock(pTask);
}
return 0;
}
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId,
pReq->upstreamNodeId);
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
// todo add the input queue buffer limitation
streamTaskEnqueueBlocks(pTask, pReq, pRsp);
......@@ -257,7 +255,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
}
// continue dispatch one block to down stream in pipeline
streamDispatch(pTask);
streamDispatchStreamBlock(pTask);
return 0;
}
......@@ -267,7 +265,7 @@ int32_t streamProcessRunReq(SStreamTask* pTask) {
}
/*if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/
/*streamDispatch(pTask);*/
/*streamDispatchStreamBlock(pTask);*/
/*}*/
return 0;
}
......
......@@ -15,20 +15,28 @@
#include "streamInc.h"
int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) {
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen);
if (pData == NULL) {
return NULL;
}
pData->type = blockType;
pData->srcVgId = srcVg;
int32_t blockNum = pReq->blockNum;
SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum);
if (pArray == NULL) {
return -1;
return NULL;
}
ASSERT(pReq->blockNum == taosArrayGetSize(pReq->data));
ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen));
ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen)));
for (int32_t i = 0; i < blockNum; i++) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i);
SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
blockDecode(pDataBlock, pRetrieve->data);
// TODO: refactor
pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
......@@ -41,7 +49,39 @@ int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDa
}
pData->blocks = pArray;
return 0;
return pData;
}
SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes) {
SStreamDataBlock* pStreamBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize);
if (pStreamBlocks == NULL) {
taosArrayClearEx(pRes, (FDelete)blockDataFreeRes);
return NULL;
}
pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK;
pStreamBlocks->blocks = pRes;
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem;
pStreamBlocks->childId = pTask->selfChildId;
pStreamBlocks->sourceVer = pSubmit->ver;
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem;
pStreamBlocks->childId = pTask->selfChildId;
pStreamBlocks->sourceVer = pMerged->ver;
}
return pStreamBlocks;
}
void destroyStreamDataBlock(SStreamDataBlock* pBlock) {
if (pBlock == NULL) {
return;
}
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock);
}
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) {
......
......@@ -24,6 +24,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1;
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
for (int32_t i = 0; i < pReq->blockNum; i++) {
......@@ -45,6 +46,8 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1;
ASSERT(pReq->blockNum > 0);
pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*));
pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t));
......@@ -178,7 +181,7 @@ CLEAR:
return code;
}
static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) return -1;
......@@ -205,6 +208,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
taosArrayPush(pReq->dataLen, &actualLen);
taosArrayPush(pReq->data, &buf);
pReq->totalLen += dataStrLen;
return 0;
}
......@@ -291,7 +295,7 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
return 0;
}
int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
void* buf = NULL;
int32_t code = -1;
SRpcMsg msg = {0};
......@@ -325,6 +329,7 @@ int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* p
code = 0;
return 0;
FAIL:
if (buf) rpcFreeCont(buf);
return code;
......@@ -360,7 +365,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
ASSERT(pVgInfo->vgId > 0);
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
return -1;
}
if (pReqs[j].blockNum == 0) {
......@@ -376,9 +381,9 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
}
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
int32_t code = -1;
int32_t blockNum = taosArrayGetSize(pData->blocks);
ASSERT(blockNum != 0);
int32_t code = 0;
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
ASSERT(numOfBlocks != 0);
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
SStreamDispatchReq req = {
......@@ -387,19 +392,23 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
.upstreamTaskId = pTask->id.taskId,
.upstreamChildId = pTask->selfChildId,
.upstreamNodeId = pTask->nodeId,
.blockNum = blockNum,
.blockNum = numOfBlocks,
};
req.data = taosArrayInit(blockNum, sizeof(void*));
req.dataLen = taosArrayInit(blockNum, sizeof(int32_t));
req.data = taosArrayInit(numOfBlocks, sizeof(void*));
req.dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t));
if (req.data == NULL || req.dataLen == NULL) {
goto FAIL_FIXED_DISPATCH;
taosArrayDestroyP(req.data, taosMemoryFree);
taosArrayDestroy(req.dataLen);
return code;
}
for (int32_t i = 0; i < blockNum; i++) {
for (int32_t i = 0; i < numOfBlocks; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
if (streamAddBlockToDispatchMsg(pDataBlock, &req) < 0) {
goto FAIL_FIXED_DISPATCH;
if (streamAddBlockIntoDispatchMsg(pDataBlock, &req) < 0) {
taosArrayDestroyP(req.data, taosMemoryFree);
taosArrayDestroy(req.dataLen);
return code;
}
}
......@@ -410,19 +419,14 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
req.taskId = downstreamTaskId;
qDebug("s-task:%s (child taskId:%d) fix-dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr,
pTask->selfChildId, blockNum, downstreamTaskId, vgId);
pTask->selfChildId, numOfBlocks, downstreamTaskId, vgId);
if (streamDispatchOneDataReq(pTask, &req, vgId, pEpSet) < 0) {
goto FAIL_FIXED_DISPATCH;
if (doSendDispatchMsg(pTask, &req, vgId, pEpSet) < 0) {
taosArrayDestroyP(req.data, taosMemoryFree);
taosArrayDestroy(req.dataLen);
return code;
}
code = 0;
FAIL_FIXED_DISPATCH:
taosArrayDestroyP(req.data, taosMemoryFree);
taosArrayDestroy(req.dataLen);
return code;
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt);
ASSERT(rspCnt == 0);
......@@ -452,13 +456,13 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
pReqs[i].taskId = pVgInfo->taskId;
}
for (int32_t i = 0; i < blockNum; i++) {
for (int32_t i = 0; i < numOfBlocks; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
// TODO: do not use broadcast
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
for (int32_t j = 0; j < vgSz; j++) {
if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
goto FAIL_SHUFFLE_DISPATCH;
}
if (pReqs[j].blockNum == 0) {
......@@ -475,7 +479,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
}
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->selfChildId,
blockNum, vgSz);
numOfBlocks, vgSz);
for (int32_t i = 0; i < vgSz; i++) {
if (pReqs[i].blockNum > 0) {
......@@ -483,7 +487,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->selfChildId,
pReqs[i].blockNum, pVgInfo->vgId);
if (streamDispatchOneDataReq(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) {
if (doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) {
goto FAIL_SHUFFLE_DISPATCH;
}
}
......@@ -501,7 +505,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
return code;
}
int32_t streamDispatch(SStreamTask* pTask) {
int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue);
if (numOfElems > 0) {
......
......@@ -21,7 +21,6 @@
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 1000
static int32_t updateCheckPointInfo (SStreamTask* pTask);
static SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes);
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
......@@ -43,7 +42,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
int32_t numOfBlocks = taosArrayGetSize(pRes);
if (numOfBlocks > 0) {
SStreamDataBlock* pStreamBlocks = createStreamDataBlockFromResults(pItem, pTask, size, pRes);
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
if (pStreamBlocks == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return -1;
......@@ -243,7 +242,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt);
streamDispatch(pTask);
streamDispatchStreamBlock(pTask);
}
if (finished) {
......@@ -319,38 +318,6 @@ int32_t updateCheckPointInfo (SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes) {
SStreamDataBlock* pStreamBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize);
if (pStreamBlocks == NULL) {
taosArrayClearEx(pRes, (FDelete)blockDataFreeRes);
return NULL;
}
pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK;
pStreamBlocks->blocks = pRes;
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem;
pStreamBlocks->childId = pTask->selfChildId;
pStreamBlocks->sourceVer = pSubmit->ver;
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem;
pStreamBlocks->childId = pTask->selfChildId;
pStreamBlocks->sourceVer = pMerged->ver;
}
return pStreamBlocks;
}
void destroyStreamDataBlock(SStreamDataBlock* pBlock) {
if (pBlock == NULL) {
return;
}
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock);
}
int32_t streamExecForAll(SStreamTask* pTask) {
int32_t code = 0;
while (1) {
......
......@@ -55,7 +55,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
// checkstatus
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
qDebug("s-taks:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, version);
qDebug("s-task:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, version);
SStreamTaskCheckReq req = {
.streamId = pTask->id.streamId,
......@@ -72,7 +72,7 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
pTask->checkReqId = req.reqId;
qDebug("s-task:%s at node %d check downstream task:%d at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId,
qDebug("s-task:%s at node %d check downstream task:0x%x at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId,
req.downstreamNodeId);
streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
......@@ -88,7 +88,7 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
taosArrayPush(pTask->checkReqIds, &req.reqId);
req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId;
qDebug("s-task:%s at node %d check downstream task:%d at node %d (shuffle)", pTask->id.idStr, pTask->nodeId,
qDebug("s-task:%s at node %d check downstream task:0x%x at node %d (shuffle)", pTask->id.idStr, pTask->nodeId,
req.downstreamTaskId, req.downstreamNodeId);
streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
......@@ -111,15 +111,16 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp
.childId = pRsp->childId,
};
qDebug("s-task:%s at node %d check downstream task %d at node %d (recheck)", pTask->id.idStr, pTask->nodeId,
qDebug("s-task:%s at node %d check downstream task:0x%x at node %d (recheck)", pTask->id.idStr, pTask->nodeId,
req.downstreamTaskId, req.downstreamNodeId);
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t vgSz = taosArrayGetSize(vgInfo);
for (int32_t i = 0; i < vgSz; i++) {
int32_t numOfVgs = taosArrayGetSize(vgInfo);
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
if (pVgInfo->taskId == req.downstreamTaskId) {
streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet);
......@@ -135,7 +136,9 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask) {
}
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) {
qDebug("task %d at node %d recv check rsp from task %d at node %d: status %d", pRsp->upstreamTaskId,
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
qDebug("s-task:%s at node %d recv check rsp from task:0x%x at node %d: status %d", pTask->id.idStr,
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status);
if (pRsp->status == 1) {
......@@ -175,9 +178,10 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
ASSERT(0);
}
} else { // not ready, wait for 100ms and retry
qDebug("s-task:%s downstream taskId:%d (vgId:%d) not ready, wait for 100ms and retry", pTask->id.idStr,
qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", pTask->id.idStr,
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
taosMsleep(100);
streamRecheckOneDownstream(pTask, pRsp);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册