提交 03c26a9d 编写于 作者: H Haojun Liao

refactor

上级 afe9b848
...@@ -170,6 +170,7 @@ typedef enum EStreamType { ...@@ -170,6 +170,7 @@ typedef enum EStreamType {
STREAM_PULL_DATA, STREAM_PULL_DATA,
STREAM_PULL_OVER, STREAM_PULL_OVER,
STREAM_FILL_OVER, STREAM_FILL_OVER,
STREAM_CHECKPOINT,
STREAM_CREATE_CHILD_TABLE, STREAM_CREATE_CHILD_TABLE,
STREAM_TRANS_STATE, STREAM_TRANS_STATE,
} EStreamType; } EStreamType;
......
...@@ -52,7 +52,6 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) ...@@ -52,7 +52,6 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData);
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId, int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
......
...@@ -440,9 +440,8 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S ...@@ -440,9 +440,8 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
return 0; return 0;
} }
int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { static int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
int32_t code = 0; int32_t code = 0;
int32_t numOfBlocks = taosArrayGetSize(pData->blocks); int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
ASSERT(numOfBlocks != 0); ASSERT(numOfBlocks != 0);
...@@ -450,7 +449,7 @@ int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { ...@@ -450,7 +449,7 @@ int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
SStreamDispatchReq req = {0}; SStreamDispatchReq req = {0};
int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId; int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, ); code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -491,7 +490,7 @@ int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { ...@@ -491,7 +490,7 @@ int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
for (int32_t i = 0; i < vgSz; i++) { for (int32_t i = 0; i < vgSz; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId); code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto FAIL_SHUFFLE_DISPATCH; goto FAIL_SHUFFLE_DISPATCH;
} }
...@@ -501,8 +500,7 @@ int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { ...@@ -501,8 +500,7 @@ int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
// TODO: do not use broadcast // TODO: do not use broadcast
if (pDataBlock->info.type == STREAM_DELETE_RESULT) { if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT || pDataBlock->info.type == STREAM_TRANS_STATE) {
for (int32_t j = 0; j < vgSz; j++) { for (int32_t j = 0; j < vgSz; j++) {
if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
goto FAIL_SHUFFLE_DISPATCH; goto FAIL_SHUFFLE_DISPATCH;
...@@ -522,14 +520,14 @@ int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { ...@@ -522,14 +520,14 @@ int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
} }
} }
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->info.selfChildId, qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr,
numOfBlocks, vgSz); pTask->info.selfChildId, numOfBlocks, vgSz);
for (int32_t i = 0; i < vgSz; i++) { for (int32_t i = 0; i < vgSz; i++) {
if (pReqs[i].blockNum > 0) { if (pReqs[i].blockNum > 0) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->info.selfChildId, qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr,
pReqs[i].blockNum, pVgInfo->vgId); pTask->info.selfChildId, pReqs[i].blockNum, pVgInfo->vgId);
code = doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet); code = doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet);
if (code < 0) { if (code < 0) {
...@@ -540,7 +538,7 @@ int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { ...@@ -540,7 +538,7 @@ int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
code = 0; code = 0;
FAIL_SHUFFLE_DISPATCH: FAIL_SHUFFLE_DISPATCH:
for (int32_t i = 0; i < vgSz; i++) { for (int32_t i = 0; i < vgSz; i++) {
taosArrayDestroyP(pReqs[i].data, taosMemoryFree); taosArrayDestroyP(pReqs[i].data, taosMemoryFree);
taosArrayDestroy(pReqs[i].dataLen); taosArrayDestroy(pReqs[i].dataLen);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册