提交 5e253cc8 编写于 作者: L Liu Jicong

refactor(stream): fill history

上级 2e640c38
...@@ -79,6 +79,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers); ...@@ -79,6 +79,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
*/ */
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema); qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
/** /**
* Set multiple input data blocks for the stream scan. * Set multiple input data blocks for the stream scan.
* @param tinfo * @param tinfo
......
...@@ -1259,7 +1259,7 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) { ...@@ -1259,7 +1259,7 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
pSubmit = streamDataSubmitNew(pReq); pSubmit = streamDataSubmitNew(pReq);
if (pSubmit == NULL) { if (pSubmit == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
qError("failed to create data submit for stream since out of memory"); tqError("failed to create data submit for stream since out of memory");
failed = true; failed = true;
} }
...@@ -1268,18 +1268,21 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) { ...@@ -1268,18 +1268,21 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
if (pIter == NULL) break; if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter; SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue; if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__RECOVER1) continue; if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__RECOVER1) {
tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus);
continue;
}
qDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver); tqDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);
if (!failed) { if (!failed) {
if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) { if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {
qError("stream task input failed, task id %d", pTask->taskId); tqError("stream task input failed, task id %d", pTask->taskId);
continue; continue;
} }
if (streamSchedExec(pTask) < 0) { if (streamSchedExec(pTask) < 0) {
qError("stream task launch failed, task id %d", pTask->taskId); tqError("stream task launch failed, task id %d", pTask->taskId);
continue; continue;
} }
} else { } else {
......
...@@ -150,18 +150,13 @@ typedef struct { ...@@ -150,18 +150,13 @@ typedef struct {
SSchemaWrapper* schema; SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN];
SSDataBlock* pullOverBlk; // for streaming
SWalFilterCond cond;
int64_t lastScanUid;
int8_t recoverStep; int8_t recoverStep;
SQueryTableDataCond tableCond; SQueryTableDataCond tableCond;
int64_t recoverStartVer;
int64_t recoverEndVer;
int64_t fillHistoryVer1; int64_t fillHistoryVer1;
int64_t fillHistoryVer2; int64_t fillHistoryVer2;
int8_t triggerSaved; // int8_t triggerSaved;
int64_t deleteMarkSaved; // int64_t deleteMarkSaved;
SStreamState* pState; SStreamState* pState;
} SStreamTaskInfo; } SStreamTaskInfo;
...@@ -461,8 +456,10 @@ typedef struct SPartitionDataInfo { ...@@ -461,8 +456,10 @@ typedef struct SPartitionDataInfo {
typedef struct STimeWindowAggSupp { typedef struct STimeWindowAggSupp {
int8_t calTrigger; int8_t calTrigger;
int64_t waterMark; int8_t calTriggerSaved;
int64_t deleteMark; int64_t deleteMark;
int64_t deleteMarkSaved;
int64_t waterMark;
TSKEY maxTs; TSKEY maxTs;
TSKEY minTs; TSKEY minTs;
SColumnInfoData timeWindowData; // query time window info for scalar function execution. SColumnInfoData timeWindowData; // query time window info for scalar function execution.
......
...@@ -70,6 +70,26 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf ...@@ -70,6 +70,26 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf
} }
} }
static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
{
ASSERT(pOperator != NULL);
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) {
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR;
}
if (pOperator->numOfDownstream > 1) { // not handle this in join query
qError("join not supported for stream block scan, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR;
}
pOperator->status = OP_NOT_OPENED;
return doSetStreamOpOpen(pOperator->pDownstream[0], id);
}
}
return 0;
}
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
ASSERT(pOperator != NULL); ASSERT(pOperator != NULL);
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
...@@ -117,7 +137,22 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -117,7 +137,22 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} }
} }
static FORCE_INLINE void streamInputBlockDataDestory(void* pBlock) { blockDataDestroy((SSDataBlock*)pBlock); } int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
if (tinfo == NULL) {
return TSDB_CODE_QRY_APP_ERROR;
}
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int32_t code = doSetStreamOpOpen(pTaskInfo->pRoot, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
} else {
qDebug("%s set the stream block successfully", GET_TASKID(pTaskInfo));
}
return code;
}
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
if (tinfo == NULL) { if (tinfo == NULL) {
...@@ -707,8 +742,7 @@ int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) { ...@@ -707,8 +742,7 @@ int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver) { int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
pTaskInfo->streamInfo.recoverStartVer = 0; pTaskInfo->streamInfo.fillHistoryVer1 = ver;
pTaskInfo->streamInfo.recoverEndVer = ver;
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE1; pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE1;
return 0; return 0;
} }
...@@ -716,8 +750,7 @@ int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver) { ...@@ -716,8 +750,7 @@ int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver) {
int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver) { int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
pTaskInfo->streamInfo.recoverStartVer = pTaskInfo->streamInfo.recoverEndVer; pTaskInfo->streamInfo.fillHistoryVer2 = ver;
pTaskInfo->streamInfo.recoverEndVer = ver;
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE2; pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE2;
return 0; return 0;
} }
...@@ -738,22 +771,44 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) { ...@@ -738,22 +771,44 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) { pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger; ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark; pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pInfo->twAggSup.calTriggerSaved == 0);
ASSERT(pInfo->twAggSup.deleteMarkSaved == 0);
qInfo("save stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger;
pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX; pInfo->twAggSup.deleteMark = INT64_MAX;
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger; ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark; pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pInfo->twAggSup.calTriggerSaved == 0);
ASSERT(pInfo->twAggSup.deleteMarkSaved == 0);
qInfo("save stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger;
pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX; pInfo->twAggSup.deleteMark = INT64_MAX;
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info; SStreamStateAggOperatorInfo* pInfo = pOperator->info;
pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger; ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark; pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pInfo->twAggSup.calTriggerSaved == 0);
ASSERT(pInfo->twAggSup.deleteMarkSaved == 0);
qInfo("save stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger;
pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX; pInfo->twAggSup.deleteMark = INT64_MAX;
} }
...@@ -783,21 +838,36 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) { ...@@ -783,21 +838,36 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) { pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);
pInfo->twAggSup.calTrigger = pTaskInfo->streamInfo.triggerSaved; ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);
pInfo->twAggSup.deleteMark = pTaskInfo->streamInfo.deleteMarkSaved;
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
qInfo("restore stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);
pInfo->twAggSup.calTrigger = pTaskInfo->streamInfo.triggerSaved; ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);
pInfo->twAggSup.deleteMark = pTaskInfo->streamInfo.deleteMarkSaved;
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
qInfo("restore stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info; SStreamStateAggOperatorInfo* pInfo = pOperator->info;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);
pInfo->twAggSup.calTrigger = pTaskInfo->streamInfo.triggerSaved; ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);
pInfo->twAggSup.deleteMark = pTaskInfo->streamInfo.deleteMarkSaved;
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
qInfo("restore stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
} }
// iterate operator tree // iterate operator tree
......
...@@ -1841,6 +1841,10 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1841,6 +1841,10 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return pBlock; return pBlock;
} }
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE; pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
pTSInfo->cond.startVersion = 0;
pTSInfo->cond.endVersion = -1;
return NULL; return NULL;
} }
#endif #endif
......
...@@ -2453,7 +2453,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2453,7 +2453,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
} else { // non-linear interpolation } else { // non-linear interpolation
pSliceInfo->current = pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
} }
} }
...@@ -3302,6 +3301,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3302,6 +3301,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
// for test 315360000000 // for test 315360000000
.deleteMark = 1000LL * 60LL * 60LL * 24LL * 365LL * 10LL, .deleteMark = 1000LL * 60LL * 60LL * 24LL * 365LL * 10LL,
// .deleteMark = INT64_MAX, // .deleteMark = INT64_MAX,
.deleteMarkSaved = 0,
.calTriggerSaved = 0,
}; };
ASSERT(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY); ASSERT(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY);
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
......
...@@ -43,6 +43,9 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) ...@@ -43,6 +43,9 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem);
void streamFreeQitem(SStreamQueueItem* data); void streamFreeQitem(SStreamQueueItem* data);
......
...@@ -250,7 +250,7 @@ FAIL: ...@@ -250,7 +250,7 @@ FAIL:
return code; return code;
} }
int32_t streamDispatchOneReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
void* buf = NULL; void* buf = NULL;
int32_t code = -1; int32_t code = -1;
SRpcMsg msg = {0}; SRpcMsg msg = {0};
...@@ -371,7 +371,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat ...@@ -371,7 +371,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
qDebug("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId, qDebug("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId,
downstreamTaskId, vgId); downstreamTaskId, vgId);
if (streamDispatchOneReq(pTask, &req, vgId, pEpSet) < 0) { if (streamDispatchOneDataReq(pTask, &req, vgId, pEpSet) < 0) {
goto FAIL_FIXED_DISPATCH; goto FAIL_FIXED_DISPATCH;
} }
code = 0; code = 0;
...@@ -433,7 +433,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat ...@@ -433,7 +433,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
if (pReqs[i].blockNum > 0) { if (pReqs[i].blockNum > 0) {
// send // send
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
if (streamDispatchOneReq(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) { if (streamDispatchOneDataReq(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) {
goto FAIL_SHUFFLE_DISPATCH; goto FAIL_SHUFFLE_DISPATCH;
} }
} }
......
...@@ -90,6 +90,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -90,6 +90,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
void* exec = pTask->exec.executor; void* exec = pTask->exec.executor;
qSetStreamOpOpen(exec);
while (1) { while (1) {
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (pRes == NULL) { if (pRes == NULL) {
...@@ -127,7 +129,10 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -127,7 +129,10 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
qRes->type = STREAM_INPUT__DATA_BLOCK; qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes; qRes->blocks = pRes;
streamTaskOutput(pTask, qRes); streamTaskOutput(pTask, qRes);
// TODO stream sched dispatch
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamDispatch(pTask);
}
} }
return 0; return 0;
} }
......
...@@ -45,7 +45,6 @@ int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* ...@@ -45,7 +45,6 @@ int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req*
int32_t streamSourceRecoverScanStep1(SStreamTask* pTask) { int32_t streamSourceRecoverScanStep1(SStreamTask* pTask) {
// //
return streamScanExec(pTask, 100); return streamScanExec(pTask, 100);
// TODO next: dispatch msg to launch scan step2
} }
int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq) { int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq) {
...@@ -66,11 +65,20 @@ int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) { ...@@ -66,11 +65,20 @@ int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) { int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) {
SStreamRecoverFinishReq req = { SStreamRecoverFinishReq req = {
.streamId = pTask->streamId, .streamId = pTask->streamId,
.taskId = pTask->taskId,
.childId = pTask->selfChildId, .childId = pTask->selfChildId,
}; };
// serialize
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
req.taskId = pTask->fixedEpDispatcher.taskId;
streamDispatchOneRecoverFinishReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t vgSz = taosArrayGetSize(vgInfo);
for (int32_t i = 0; i < vgSz; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.taskId = pVgInfo->taskId;
streamDispatchOneRecoverFinishReq(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
} }
return 0; return 0;
} }
...@@ -78,9 +86,9 @@ int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) { ...@@ -78,9 +86,9 @@ int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) {
// agg // agg
int32_t streamAggRecoverPrepare(SStreamTask* pTask) { int32_t streamAggRecoverPrepare(SStreamTask* pTask) {
void* exec = pTask->exec.executor; void* exec = pTask->exec.executor;
if (qStreamSetParamForRecover(exec) < 0) { /*if (qStreamSetParamForRecover(exec) < 0) {*/
return -1; /*return -1;*/
} /*}*/
pTask->recoverWaitingChild = taosArrayGetSize(pTask->childEpInfo); pTask->recoverWaitingChild = taosArrayGetSize(pTask->childEpInfo);
return 0; return 0;
} }
...@@ -98,10 +106,12 @@ int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) { ...@@ -98,10 +106,12 @@ int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) {
} }
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) { int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) {
int32_t left = atomic_sub_fetch_32(&pTask->recoverWaitingChild, 1); if (pTask->taskLevel == TASK_LEVEL__AGG) {
ASSERT(left >= 0); int32_t left = atomic_sub_fetch_32(&pTask->recoverWaitingChild, 1);
if (left == 0) { ASSERT(left >= 0);
streamAggChildrenRecoverFinish(pTask); if (left == 0) {
streamAggChildrenRecoverFinish(pTask);
}
} }
return 0; return 0;
} }
......
...@@ -233,6 +233,7 @@ ...@@ -233,6 +233,7 @@
./test.sh -f tsim/stream/basic1.sim ./test.sh -f tsim/stream/basic1.sim
./test.sh -f tsim/stream/basic2.sim ./test.sh -f tsim/stream/basic2.sim
./test.sh -f tsim/stream/drop_stream.sim ./test.sh -f tsim/stream/drop_stream.sim
./test.sh -f tsim/stream/fillHistoryBasic1.sim
./test.sh -f tsim/stream/distributeInterval0.sim ./test.sh -f tsim/stream/distributeInterval0.sim
./test.sh -f tsim/stream/distributeIntervalRetrive0.sim ./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
./test.sh -f tsim/stream/distributeSession0.sim ./test.sh -f tsim/stream/distributeSession0.sim
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database
sql create database test vgroups 1;
sql select * from information_schema.ins_databases
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream stream1 trigger at_once fill_history 1 into streamt as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791223001,2,2,3,1.1);
sql insert into t1 values(1648791233002,3,2,3,2.1);
sql insert into t1 values(1648791243003,4,2,3,3.1);
sql insert into t1 values(1648791213004,4,2,3,4.1);
sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
if $rows != 4 then
print ======$rows
return -1
endi
# row 0
if $data01 != 2 then
print ======$data01
return -1
endi
if $data02 != 2 then
print ======$data02
return -1
endi
if $data03 != 5 then
print ======$data03
return -1
endi
if $data04 != 2 then
print ======$data04
return -1
endi
if $data05 != 3 then
print ======$data05
return -1
endi
# row 1
if $data11 != 1 then
print ======$data11
return -1
endi
if $data12 != 1 then
print ======$data12
return -1
endi
if $data13 != 2 then
print ======$data13
return -1
endi
if $data14 != 2 then
print ======$data14
return -1
endi
if $data15 != 3 then
print ======$data15
return -1
endi
# row 2
if $data21 != 1 then
print ======$data21
return -1
endi
if $data22 != 1 then
print ======$data22
return -1
endi
if $data23 != 3 then
print ======$data23
return -1
endi
if $data24 != 2 then
print ======$data24
return -1
endi
if $data25 != 3 then
print ======$data25
return -1
endi
# row 3
if $data31 != 1 then
print ======$data31
return -1
endi
if $data32 != 1 then
print ======$data32
return -1
endi
if $data33 != 4 then
print ======$data33
return -1
endi
if $data34 != 2 then
print ======$data34
return -1
endi
if $data35 != 3 then
print ======$data35
return -1
endi
sql insert into t1 values(1648791223001,12,14,13,11.1);
sleep 500
sql select * from streamt;
print count(*) , count(d) , sum(a) , max(b) , min(c)
print 0: $data00 , $data01 , $data02 , $data03 , $data04 , $data05
print 1: $data10 , $data11 , $data12 , $data13 , $data14 , $data15
if $rows != 4 then
print ======$rows
return -1
endi
# row 0
if $data01 != 2 then
print ======$data01
return -1
endi
if $data02 != 2 then
print ======$data02
return -1
endi
if $data03 != 5 then
print ======$data03
return -1
endi
if $data04 != 2 then
print ======$data04
return -1
endi
if $data05 != 3 then
print ======$data05
return -1
endi
# row 1
if $data11 != 1 then
print ======$data11
return -1
endi
if $data12 != 1 then
print ======$data12
return -1
endi
if $data13 != 12 then
print ======$data13
return -1
endi
if $data14 != 14 then
print ======$data14
return -1
endi
if $data15 != 13 then
print ======$data15
return -1
endi
# row 2
if $data21 != 1 then
print ======$data21
return -1
endi
if $data22 != 1 then
print ======$data22
return -1
endi
if $data23 != 3 then
print ======$data23
return -1
endi
if $data24 != 2 then
print ======$data24
return -1
endi
if $data25 != 3 then
print ======$data25
return -1
endi
# row 3
if $data31 != 1 then
print ======$data31
return -1
endi
if $data32 != 1 then
print ======$data32
return -1
endi
if $data33 != 4 then
print ======$data33
return -1
endi
if $data34 != 2 then
print ======$data34
return -1
endi
if $data35 != 3 then
print ======$data35
return -1
endi
sql insert into t1 values(1648791223002,12,14,13,11.1);
sleep 100
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 2 then
print ======$data11
return -1
endi
if $data12 != 2 then
print ======$data12
return -1
endi
if $data13 != 24 then
print ======$data13
return -1
endi
if $data14 != 14 then
print ======$data14
return -1
endi
if $data15 != 13 then
print ======$data15
return -1
endi
sql insert into t1 values(1648791223003,12,14,13,11.1);
sleep 100
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 3 then
print ======$data11
return -1
endi
if $data12 != 3 then
print ======$data12
return -1
endi
if $data13 != 36 then
print ======$data13
return -1
endi
if $data14 != 14 then
print ======$data14
return -1
endi
if $data15 != 13 then
print ======$data15
return -1
endi
sql insert into t1 values(1648791223001,1,1,1,1.1);
sql insert into t1 values(1648791223002,2,2,2,2.1);
sql insert into t1 values(1648791223003,3,3,3,3.1);
sleep 100
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 3 then
print ======$data11
return -1
endi
if $data12 != 3 then
print ======$data12
return -1
endi
if $data13 != 6 then
print ======$data13
return -1
endi
if $data14 != 3 then
print ======$data14
return -1
endi
if $data15 != 1 then
print ======$data15
return -1
endi
sql insert into t1 values(1648791233003,3,2,3,2.1);
sql insert into t1 values(1648791233002,5,6,7,8.1);
sql insert into t1 values(1648791233002,3,2,3,2.1);
sleep 100
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 2
if $data21 != 2 then
print ======$data21
return -1
endi
if $data22 != 2 then
print ======$data22
return -1
endi
if $data23 != 6 then
print ======$data23
return -1
endi
if $data24 != 2 then
print ======$data24
return -1
endi
if $data25 != 3 then
print ======$data25
return -1
endi
sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1);
sleep 100
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 0
if $data01 != 4 then
print ======$data01
return -1
endi
if $data02 != 4 then
print ======$data02
return -1
endi
if $data03 != 50 then
print ======$data03 != 50
return -1
endi
if $data04 != 20 then
print ======$data04 != 20
return -1
endi
if $data05 != 3 then
print ======$data05
return -1
endi
sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1);
sleep 100
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 4 then
print ======$data11
return -1
endi
if $data12 != 4 then
print ======$data12
return -1
endi
if $data13 != 46 then
print ======$data13 != 46
return -1
endi
if $data14 != 20 then
print ======$data14 != 20
return -1
endi
if $data15 != 1 then
print ======$data15
return -1
endi
# row 2
if $data21 != 4 then
print ======$data21
return -1
endi
if $data22 != 4 then
print ======$data22
return -1
endi
if $data23 != 15 then
print ======$data23
return -1
endi
if $data24 != 4 then
print ======$data24
return -1
endi
if $data25 != 3 then
print ======$data25
return -1
endi
sql create database test2 vgroups 1;
sql select * from information_schema.ins_databases;
sql use test2;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791223001,2,2,3,1.1);
sql insert into t1 values(1648791233002,3,2,3,2.1);
sql insert into t1 values(1648791243003,4,2,3,3.1);
sql insert into t1 values(1648791213004,4,2,3,4.1);
sql create stream stream2 trigger at_once fill_history 1 into streamt as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s);
sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
if $rows != 4 then
print ======$rows
return -1
endi
# row 0
if $data01 != 2 then
print ======$data01
return -1
endi
if $data02 != 2 then
print ======$data02
return -1
endi
if $data03 != 5 then
print ======$data03
return -1
endi
if $data04 != 2 then
print ======$data04
return -1
endi
if $data05 != 3 then
print ======$data05
return -1
endi
# row 1
if $data11 != 1 then
print ======$data11
return -1
endi
if $data12 != 1 then
print ======$data12
return -1
endi
if $data13 != 2 then
print ======$data13
return -1
endi
if $data14 != 2 then
print ======$data14
return -1
endi
if $data15 != 3 then
print ======$data15
return -1
endi
# row 2
if $data21 != 1 then
print ======$data21
return -1
endi
if $data22 != 1 then
print ======$data22
return -1
endi
if $data23 != 3 then
print ======$data23
return -1
endi
if $data24 != 2 then
print ======$data24
return -1
endi
if $data25 != 3 then
print ======$data25
return -1
endi
# row 3
if $data31 != 1 then
print ======$data31
return -1
endi
if $data32 != 1 then
print ======$data32
return -1
endi
if $data33 != 4 then
print ======$data33
return -1
endi
if $data34 != 2 then
print ======$data34
return -1
endi
if $data35 != 3 then
print ======$data35
return -1
endi
sql insert into t1 values(1648791223001,12,14,13,11.1);
sleep 500
sql select * from streamt;
print count(*) , count(d) , sum(a) , max(b) , min(c)
print 0: $data00 , $data01 , $data02 , $data03 , $data04 , $data05
print 1: $data10 , $data11 , $data12 , $data13 , $data14 , $data15
if $rows != 4 then
print ======$rows
return -1
endi
# row 0
if $data01 != 2 then
print ======$data01
return -1
endi
if $data02 != 2 then
print ======$data02
return -1
endi
if $data03 != 5 then
print ======$data03
return -1
endi
if $data04 != 2 then
print ======$data04
return -1
endi
if $data05 != 3 then
print ======$data05
return -1
endi
# row 1
if $data11 != 1 then
print ======$data11
return -1
endi
if $data12 != 1 then
print ======$data12
return -1
endi
if $data13 != 12 then
print ======$data13
return -1
endi
if $data14 != 14 then
print ======$data14
return -1
endi
if $data15 != 13 then
print ======$data15
return -1
endi
# row 2
if $data21 != 1 then
print ======$data21
return -1
endi
if $data22 != 1 then
print ======$data22
return -1
endi
if $data23 != 3 then
print ======$data23
return -1
endi
if $data24 != 2 then
print ======$data24
return -1
endi
if $data25 != 3 then
print ======$data25
return -1
endi
# row 3
if $data31 != 1 then
print ======$data31
return -1
endi
if $data32 != 1 then
print ======$data32
return -1
endi
if $data33 != 4 then
print ======$data33
return -1
endi
if $data34 != 2 then
print ======$data34
return -1
endi
if $data35 != 3 then
print ======$data35
return -1
endi
sql insert into t1 values(1648791223002,12,14,13,11.1);
sleep 100
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 2 then
print ======$data11
return -1
endi
if $data12 != 2 then
print ======$data12
return -1
endi
if $data13 != 24 then
print ======$data13
return -1
endi
if $data14 != 14 then
print ======$data14
return -1
endi
if $data15 != 13 then
print ======$data15
return -1
endi
sql insert into t1 values(1648791223003,12,14,13,11.1);
sleep 100
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 3 then
print ======$data11
return -1
endi
if $data12 != 3 then
print ======$data12
return -1
endi
if $data13 != 36 then
print ======$data13
return -1
endi
if $data14 != 14 then
print ======$data14
return -1
endi
if $data15 != 13 then
print ======$data15
return -1
endi
sql insert into t1 values(1648791223001,1,1,1,1.1);
sql insert into t1 values(1648791223002,2,2,2,2.1);
sql insert into t1 values(1648791223003,3,3,3,3.1);
sleep 100
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 3 then
print ======$data11
return -1
endi
if $data12 != 3 then
print ======$data12
return -1
endi
if $data13 != 6 then
print ======$data13
return -1
endi
if $data14 != 3 then
print ======$data14
return -1
endi
if $data15 != 1 then
print ======$data15
return -1
endi
sql insert into t1 values(1648791233003,3,2,3,2.1);
sql insert into t1 values(1648791233002,5,6,7,8.1);
sql insert into t1 values(1648791233002,3,2,3,2.1);
sleep 100
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 2
if $data21 != 2 then
print ======$data21
return -1
endi
if $data22 != 2 then
print ======$data22
return -1
endi
if $data23 != 6 then
print ======$data23
return -1
endi
if $data24 != 2 then
print ======$data24
return -1
endi
if $data25 != 3 then
print ======$data25
return -1
endi
sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1);
sleep 100
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 0
if $data01 != 4 then
print ======$data01
return -1
endi
if $data02 != 4 then
print ======$data02
return -1
endi
if $data03 != 50 then
print ======$data03 != 50
return -1
endi
if $data04 != 20 then
print ======$data04 != 20
return -1
endi
if $data05 != 3 then
print ======$data05
return -1
endi
sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1);
sleep 100
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 4 then
print ======$data11
return -1
endi
if $data12 != 4 then
print ======$data12
return -1
endi
if $data13 != 46 then
print ======$data13 != 46
return -1
endi
if $data14 != 20 then
print ======$data14 != 20
return -1
endi
if $data15 != 1 then
print ======$data15
return -1
endi
# row 2
if $data21 != 4 then
print ======$data21
return -1
endi
if $data22 != 4 then
print ======$data22
return -1
endi
if $data23 != 15 then
print ======$data23
return -1
endi
if $data24 != 4 then
print ======$data24
return -1
endi
if $data25 != 3 then
print ======$data25
return -1
endi
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册