提交 a89ce1a2 编写于 作者: H Haojun Liao

fix(stream): transfer state by using data block.

上级 dea30255
...@@ -152,6 +152,7 @@ enum { ...@@ -152,6 +152,7 @@ enum {
STREAM_INPUT__DATA_RETRIEVE, STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__GET_RES, STREAM_INPUT__GET_RES,
STREAM_INPUT__CHECKPOINT, STREAM_INPUT__CHECKPOINT,
STREAM_INPUT__TRANS_STATE,
STREAM_INPUT__REF_DATA_BLOCK, STREAM_INPUT__REF_DATA_BLOCK,
STREAM_INPUT__DESTROY, STREAM_INPUT__DESTROY,
}; };
......
...@@ -627,6 +627,7 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask); ...@@ -627,6 +627,7 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask); int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);
int32_t appendTranstateIntoInputQ(SStreamTask* pTask);
// agg level // agg level
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask); int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask);
......
...@@ -390,6 +390,11 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -390,6 +390,11 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
if (level == TASK_LEVEL__SOURCE) { if (level == TASK_LEVEL__SOURCE) {
streamTaskFillHistoryFinished(pTask); streamTaskFillHistoryFinished(pTask);
streamTaskEndScanWAL(pTask); streamTaskEndScanWAL(pTask);
code = streamDoTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle this
return code;
}
} else if (level == TASK_LEVEL__AGG) { // do transfer task operator states. } else if (level == TASK_LEVEL__AGG) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask); code = streamDoTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle this if (code != TSDB_CODE_SUCCESS) { // todo handle this
...@@ -460,6 +465,40 @@ static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu ...@@ -460,6 +465,40 @@ static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
} }
} }
int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
const char* id = pTask->id.idStr;
int32_t code = TSDB_CODE_SUCCESS;
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SINK) {
int32_t remain = streamAlignTransferState(pTask);
if (remain > 0) {
qDebug("s-task:%s receive upstream transfer state msg, remain:%d", id, remain);
return 0;
}
// transfer the ownership of executor state
qDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", id);
ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
pTask->status.transferState = true;
}
// dispatch the transtate block to downstream task immediately
if (level == TASK_LEVEL__SOURCE || level == TASK_LEVEL__AGG) {
// pBlock-> = pTask->id.taskId;
pBlock->srcVgId = pTask->pMeta->vgId;
code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);
if (code == 0) {
streamDispatchStreamBlock(pTask);
} else {
streamFreeQitem((SStreamQueueItem*)pBlock);
}
}
return code;
}
/** /**
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
* appropriate batch of blocks should be handled in 5 to 10 sec. * appropriate batch of blocks should be handled in 5 to 10 sec.
...@@ -484,6 +523,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -484,6 +523,11 @@ int32_t streamExecForAll(SStreamTask* pTask) {
break; break;
} }
if (pInput->type == STREAM_INPUT__TRANS_STATE) {
streamProcessTranstateBlock(pTask, (SStreamDataBlock*)pInput);
return 0;
}
if (pTask->info.taskLevel == TASK_LEVEL__SINK) { if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK); ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize); qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize);
...@@ -557,17 +601,15 @@ int32_t streamTaskEndScanWAL(SStreamTask* pTask) { ...@@ -557,17 +601,15 @@ int32_t streamTaskEndScanWAL(SStreamTask* pTask) {
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
// 1. notify all downstream tasks to transfer executor state after handle all history blocks. // 1. notify all downstream tasks to transfer executor state after handle all history blocks.
int32_t code = streamDispatchTransferStateMsg(pTask); // pTask->status.transferState = true;
if (code != TSDB_CODE_SUCCESS) { appendTranstateIntoInputQ(pTask);
// todo handle error
}
// 2. do transfer stream task operator states. // 2. do transfer stream task operator states.
pTask->status.transferState = true; // todo remove this
code = streamDoTransferStateToStreamTask(pTask); // int32_t code = streamDoTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle error // if (code != TSDB_CODE_SUCCESS) { // todo handle error
return code; // return code;
} // }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -415,6 +415,35 @@ static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferRe ...@@ -415,6 +415,35 @@ static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferRe
return 0; return 0;
} }
int32_t appendTranstateIntoInputQ(SStreamTask* pTask) {
SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pTranstate == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pBlock == NULL) {
taosFreeQitem(pTranstate);
return TSDB_CODE_OUT_OF_MEMORY;
}
pBlock->info.type = STREAM_TRANS_STATE;
pBlock->info.rows = 1;
pBlock->info.childId = pTask->info.selfChildId;
pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock));//pBlock;
taosArrayPush(pTranstate->blocks, pBlock);
taosMemoryFree(pBlock);
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pTranstate) < 0) {
taosFreeQitem(pTranstate);
return TSDB_CODE_OUT_OF_MEMORY;
}
streamSchedExec(pTask);
return TSDB_CODE_SUCCESS;
}
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId }; SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册