提交 9e4b3adc 编写于 作者: L Liu Jicong

fix(stream): msg dispatch

上级 34749823
...@@ -339,6 +339,7 @@ typedef struct { ...@@ -339,6 +339,7 @@ typedef struct {
int32_t sourceTaskId; int32_t sourceTaskId;
int32_t sourceVg; int32_t sourceVg;
int32_t sourceChildId; int32_t sourceChildId;
int32_t upstreamNodeId;
#if 0 #if 0
int64_t sourceVer; int64_t sourceVer;
#endif #endif
......
...@@ -353,7 +353,9 @@ SArray *vmGetMsgHandles() { ...@@ -353,7 +353,9 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -448,7 +448,7 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -448,7 +448,7 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
} }
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = pMsg->pCont; SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = pRsp->taskId; int32_t taskId = pRsp->taskId;
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
streamProcessDispatchRsp(pTask, &pTq->pVnode->msgCb, pRsp); streamProcessDispatchRsp(pTask, &pTq->pVnode->msgCb, pRsp);
......
...@@ -737,8 +737,8 @@ static bool isStateWindow(SStreamBlockScanInfo* pInfo) { ...@@ -737,8 +737,8 @@ static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
SSDataBlock* pSDB = pInfo->pUpdateRes; SSDataBlock* pSDB = pInfo->pUpdateRes;
STimeWindow win = { STimeWindow win = {
.skey = INT64_MIN, .skey = INT64_MIN,
.ekey = INT64_MAX, .ekey = INT64_MAX,
}; };
bool needRead = false; bool needRead = false;
if (!isStateWindow(pInfo) && pInfo->updateResIndex < pSDB->info.rows) { if (!isStateWindow(pInfo) && pInfo->updateResIndex < pSDB->info.rows) {
...@@ -846,7 +846,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -846,7 +846,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
size_t total = taosArrayGetSize(pInfo->pBlockLists); size_t total = taosArrayGetSize(pInfo->pBlockLists);
if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) { if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) {
if (pInfo->validBlockIndex >= total) { if (pInfo->validBlockIndex >= total) {
doClearBufferedBlocks(pInfo); /*doClearBufferedBlocks(pInfo);*/
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
return NULL; return NULL;
} }
......
...@@ -59,7 +59,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* ...@@ -59,7 +59,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
// rsp by input status // rsp by input status
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
((SMsgHead*)buf)->vgId = htonl(pReq->sourceVg); ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead)); SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
pCont->inputStatus = status; pCont->inputStatus = status;
pCont->streamId = pReq->streamId; pCont->streamId = pReq->streamId;
......
...@@ -22,6 +22,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p ...@@ -22,6 +22,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
if (tEncodeI32(pEncoder, pReq->sourceTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->sourceTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->sourceVg) < 0) return -1; if (tEncodeI32(pEncoder, pReq->sourceVg) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->sourceChildId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->sourceChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum); ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum); ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
...@@ -42,6 +43,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { ...@@ -42,6 +43,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
if (tDecodeI32(pDecoder, &pReq->sourceTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->sourceTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->sourceVg) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->sourceVg) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->sourceChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->sourceChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
ASSERT(pReq->blockNum > 0); ASSERT(pReq->blockNum > 0);
pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*)); pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*));
...@@ -94,6 +96,7 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM ...@@ -94,6 +96,7 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
.sourceTaskId = pTask->taskId, .sourceTaskId = pTask->taskId,
.sourceVg = data->sourceVg, .sourceVg = data->sourceVg,
.sourceChildId = pTask->childId, .sourceChildId = pTask->childId,
.upstreamNodeId = pTask->nodeId,
.blockNum = blockNum, .blockNum = blockNum,
}; };
...@@ -184,13 +187,17 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) { ...@@ -184,13 +187,17 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
#endif #endif
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue);
if (pBlock == NULL) return 0; if (pBlock == NULL) {
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
return 0;
}
ASSERT(pBlock->type == STREAM_DATA_TYPE_SSDATA_BLOCK); ASSERT(pBlock->type == STREAM_DATA_TYPE_SSDATA_BLOCK);
SRpcMsg dispatchMsg = {0}; SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet = NULL; SEpSet* pEpSet = NULL;
if (streamBuildDispatchMsg(pTask, pBlock, &dispatchMsg, &pEpSet) < 0) { if (streamBuildDispatchMsg(pTask, pBlock, &dispatchMsg, &pEpSet) < 0) {
ASSERT(0); ASSERT(0);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
return -1; return -1;
} }
......
...@@ -65,6 +65,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { ...@@ -65,6 +65,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
} }
qRes->type = STREAM_INPUT__DATA_BLOCK; qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes; qRes->blocks = pRes;
/*qRes->sourceVg = pTask->nodeId;*/
if (streamTaskOutput(pTask, qRes) < 0) { if (streamTaskOutput(pTask, qRes) < 0) {
streamQueueProcessFail(pTask->inputQueue); streamQueueProcessFail(pTask->inputQueue);
taosArrayDestroy(pRes); taosArrayDestroy(pRes);
...@@ -76,7 +77,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { ...@@ -76,7 +77,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
streamDataSubmitRefDec((SStreamDataSubmit*)data); streamDataSubmitRefDec((SStreamDataSubmit*)data);
taosFreeQitem(data); taosFreeQitem(data);
} else { } else {
/*taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);*/ taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(data); taosFreeQitem(data);
} }
streamQueueProcessSuccess(pTask->inputQueue); streamQueueProcessSuccess(pTask->inputQueue);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册