提交 9981cecb 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 fe21943c
...@@ -221,13 +221,9 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan ...@@ -221,13 +221,9 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo); int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo);
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo); bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo); int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo);
bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo);
int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo);
void resetTaskInfo(qTaskInfo_t tinfo); void resetTaskInfo(qTaskInfo_t tinfo);
void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo);
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo); int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo);
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo); int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo);
......
...@@ -607,8 +607,6 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); ...@@ -607,8 +607,6 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask);
bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask);
int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask); int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask);
// common // common
......
...@@ -1296,7 +1296,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1296,7 +1296,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
"s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time " "s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time "
"window:%" PRId64 " - %" PRId64, "window:%" PRId64 " - %" PRId64,
id, pWindow->skey, pWindow->ekey); id, pWindow->skey, pWindow->ekey);
qResetStreamInfoTimeWindow(pTask->exec.pExecutor); qStreamInfoResetTimewindowFilter(pTask->exec.pExecutor);
} else { } else {
// when related fill-history task exists, update the fill-history time window only when the // when related fill-history task exists, update the fill-history time window only when the
// state transfer is completed. // state transfer is completed.
......
...@@ -62,8 +62,8 @@ typedef struct { ...@@ -62,8 +62,8 @@ typedef struct {
SSchemaWrapper* schema; SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor
int8_t recoverStep; int8_t recoverStep;
bool recoverStep1Finished; // bool recoverStep1Finished;
bool recoverStep2Finished; // bool recoverStep2Finished;
int8_t recoverScanFinished; int8_t recoverScanFinished;
SQueryTableDataCond tableCond; SQueryTableDataCond tableCond;
SVersionRange fillHistoryVer; SVersionRange fillHistoryVer;
......
...@@ -116,17 +116,6 @@ void resetTaskInfo(qTaskInfo_t tinfo) { ...@@ -116,17 +116,6 @@ void resetTaskInfo(qTaskInfo_t tinfo) {
clearStreamBlock(pTaskInfo->pRoot); clearStreamBlock(pTaskInfo->pRoot);
} }
void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
if (pTaskInfo == NULL) {
return;
}
qDebug("%s set stream fill-history window:%" PRId64"-%"PRId64, GET_TASKID(pTaskInfo), INT64_MIN, INT64_MAX);
pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN;
pTaskInfo->streamInfo.fillHistoryWindow.ekey = INT64_MAX;
}
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) { static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) { if (pOperator->numOfDownstream == 0) {
...@@ -341,7 +330,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v ...@@ -341,7 +330,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
return NULL; return NULL;
} }
qResetStreamInfoTimeWindow(pTaskInfo); qStreamInfoResetTimewindowFilter(pTaskInfo);
return pTaskInfo; return pTaskInfo;
} }
...@@ -891,8 +880,6 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan ...@@ -891,8 +880,6 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan
pStreamInfo->fillHistoryVer = *pVerRange; pStreamInfo->fillHistoryVer = *pVerRange;
pStreamInfo->fillHistoryWindow = *pWindow; pStreamInfo->fillHistoryWindow = *pWindow;
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE1; pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE1;
pStreamInfo->recoverStep1Finished = false;
pStreamInfo->recoverStep2Finished = false;
qDebug("%s step 1. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 qDebug("%s step 1. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64
" - %" PRId64, " - %" PRId64,
...@@ -910,8 +897,6 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan ...@@ -910,8 +897,6 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
pStreamInfo->fillHistoryVer = *pVerRange; pStreamInfo->fillHistoryVer = *pVerRange;
pStreamInfo->fillHistoryWindow = *pWindow; pStreamInfo->fillHistoryWindow = *pWindow;
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2; pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2;
pStreamInfo->recoverStep1Finished = true;
pStreamInfo->recoverStep2Finished = false;
qDebug("%s step 2. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 qDebug("%s step 2. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64
", window:%" PRId64 " - %" PRId64, ", window:%" PRId64 " - %" PRId64,
...@@ -1050,23 +1035,15 @@ bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) { ...@@ -1050,23 +1035,15 @@ bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
return pTaskInfo->streamInfo.recoverScanFinished; return pTaskInfo->streamInfo.recoverScanFinished;
} }
bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo) { int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.recoverStep1Finished; STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow;
}
bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo) { qDebug("%s set remove scan-history filter window:%" PRId64 "-%" PRId64 ", new window:%" PRId64 "-%" PRId64,
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX);
return pTaskInfo->streamInfo.recoverStep2Finished;
}
int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
pTaskInfo->streamInfo.recoverStep1Finished = true;
pTaskInfo->streamInfo.recoverStep2Finished = true;
// reset the time window pWindow->skey = INT64_MIN;
pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN; pWindow->ekey = INT64_MAX;
return 0; return 0;
} }
......
...@@ -163,15 +163,14 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i ...@@ -163,15 +163,14 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
} }
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
int32_t code = 0;
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
void* exec = pTask->exec.pExecutor; int32_t code = TSDB_CODE_SUCCESS;
void* exec = pTask->exec.pExecutor;
bool finished = false;
qSetStreamOpOpen(exec); qSetStreamOpOpen(exec);
bool finished = false;
while (1) { while (!finished) {
if (streamTaskShouldPause(&pTask->status)) { if (streamTaskShouldPause(&pTask->status)) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0; double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el); qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
...@@ -184,44 +183,30 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -184,44 +183,30 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
return -1; return -1;
} }
int32_t batchCnt = 0; int32_t numOfBlocks = 0;
while (1) { while (1) {
if (streamTaskShouldStop(&pTask->status)) { if (streamTaskShouldStop(&pTask->status)) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return 0; return 0;
} }
if (streamTaskShouldPause(&pTask->status)) {
break;
}
SSDataBlock* output = NULL; SSDataBlock* output = NULL;
uint64_t ts = 0; uint64_t ts = 0;
if (qExecTask(exec, &output, &ts) < 0) { if (qExecTask(exec, &output, &ts) < 0) {
continue; continue;
} }
if (output == NULL) { if (output == NULL && qStreamRecoverScanFinished(exec)) {
if (qStreamRecoverScanFinished(exec)) { finished = true;
finished = true;
} else {
qSetStreamOpOpen(exec);
if (streamTaskShouldPause(&pTask->status)) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;
code = streamTaskOutputResultBlock(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
return code;
}
return 0;
}
}
break; break;
} else {
if (output == NULL) {
ASSERT(0);
}
} }
SSDataBlock block = {0}; SSDataBlock block = {0};
...@@ -229,86 +214,37 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -229,86 +214,37 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
block.info.childId = pTask->info.selfChildId; block.info.childId = pTask->info.selfChildId;
taosArrayPush(pRes, &block); taosArrayPush(pRes, &block);
batchCnt++; numOfBlocks++;
qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, numOfBlocks, batchSz);
qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz); if (numOfBlocks >= batchSz) {
if (batchCnt >= batchSz) {
break; break;
} }
} }
if (taosArrayGetSize(pRes) == 0) { if (taosArrayGetSize(pRes) > 0) {
taosArrayDestroy(pRes); SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) {
if (finished) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
qDebug("s-task:%s finish recover exec task ", pTask->id.idStr); terrno = TSDB_CODE_OUT_OF_MEMORY;
break; return -1;
} else {
qDebug("s-task:%s continue recover exec task ", pTask->id.idStr);
continue;
} }
}
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
qRes->type = STREAM_INPUT__DATA_BLOCK; qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes; qRes->blocks = pRes;
code = streamTaskOutputResultBlock(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
return code;
}
if (finished) { code = streamTaskOutputResultBlock(pTask, qRes);
break; if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
} taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
} taosFreeQitem(qRes);
return 0; return code;
} }
#if 0
int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
// fetch all queue item, merge according to batchLimit
int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall);
if (numOfItems == 0) {
qDebug("task: %d, stream task exec over, queue empty", pTask->id.taskId);
return 0;
}
SStreamQueueItem* pMerged = NULL;
SStreamQueueItem* pItem = NULL;
taosGetQitem(pTask->inputQall, (void**)&pItem);
if (pItem == NULL) {
if (pMerged != NULL) {
// process merged item
} else { } else {
return 0; taosArrayDestroy(pRes);
} }
} }
// if drop
if (pItem->type == STREAM_INPUT__DESTROY) {
// set status drop
return -1;
}
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK);
streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem);
}
// exec impl
// output
// try dispatch
return 0; return 0;
} }
#endif
int32_t updateCheckPointInfo(SStreamTask* pTask) { int32_t updateCheckPointInfo(SStreamTask* pTask) {
int64_t ckId = 0; int64_t ckId = 0;
...@@ -404,7 +340,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -404,7 +340,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
// expand the query time window for stream scanner // expand the query time window for stream scanner
pTimeWindow->skey = INT64_MIN; pTimeWindow->skey = INT64_MIN;
qResetStreamInfoTimeWindow(pStreamTask->exec.pExecutor); qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
// transfer the ownership of executor state // transfer the ownership of executor state
streamTaskReleaseState(pTask); streamTaskReleaseState(pTask);
......
...@@ -647,19 +647,9 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { ...@@ -647,19 +647,9 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
return 0; return 0;
} }
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
return qStreamRecoverScanStep1Finished(exec);
}
bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
return qStreamRecoverScanStep2Finished(exec);
}
int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) { int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor; void* exec = pTask->exec.pExecutor;
return qStreamRecoverSetAllStepFinished(exec); return qStreamInfoResetTimewindowFilter(exec);
} }
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册