提交 0cd87101 编写于 作者: 5 54liuyao

fix:fix fill history bug

上级 325b3439
...@@ -219,6 +219,7 @@ int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); ...@@ -219,6 +219,7 @@ int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
int32_t qStreamRestoreParam(qTaskInfo_t tinfo); int32_t qStreamRestoreParam(qTaskInfo_t tinfo);
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo); bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
void qStreamCloseTsdbReader(void* task); void qStreamCloseTsdbReader(void* task);
void resetTaskInfo(qTaskInfo_t tinfo);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -104,6 +104,12 @@ static void clearStreamBlock(SOperatorInfo* pOperator) { ...@@ -104,6 +104,12 @@ static void clearStreamBlock(SOperatorInfo* pOperator) {
} }
} }
void resetTaskInfo(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
pTaskInfo->code = 0;
clearStreamBlock(pTaskInfo->pRoot);
}
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) { if (pOperator->numOfDownstream == 0) {
...@@ -618,7 +624,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { ...@@ -618,7 +624,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
pTaskInfo->cost.start = taosGetTimestampUs(); pTaskInfo->cost.start = taosGetTimestampUs();
} }
if (isTaskKilled(pTaskInfo) && pTaskInfo->code != TSDB_CODE_QRY_IN_EXEC) { if (isTaskKilled(pTaskInfo)) {
clearStreamBlock(pTaskInfo->pRoot); clearStreamBlock(pTaskInfo->pRoot);
atomic_store_64(&pTaskInfo->owner, 0); atomic_store_64(&pTaskInfo->owner, 0);
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo)); qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
......
...@@ -946,6 +946,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* ...@@ -946,6 +946,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
} }
FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) { FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
qDebug("clear buff blocks:%d", (int32_t)taosArrayGetSize(pInfo->pBlockLists));
taosArrayClear(pInfo->pBlockLists); taosArrayClear(pInfo->pBlockLists);
pInfo->validBlockIndex = 0; pInfo->validBlockIndex = 0;
} }
......
...@@ -20,6 +20,11 @@ ...@@ -20,6 +20,11 @@
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) {
int32_t code; int32_t code;
void* exec = pTask->exec.executor; void* exec = pTask->exec.executor;
while(atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
qError("stream task wait for the end of fill history");
taosMsleep(2);
continue;
}
// set input // set input
const SStreamQueueItem* pItem = (const SStreamQueueItem*)data; const SStreamQueueItem* pItem = (const SStreamQueueItem*)data;
...@@ -58,6 +63,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* ...@@ -58,6 +63,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
SSDataBlock* output = NULL; SSDataBlock* output = NULL;
uint64_t ts = 0; uint64_t ts = 0;
if ((code = qExecTask(exec, &output, &ts)) < 0) { if ((code = qExecTask(exec, &output, &ts)) < 0) {
if (code == TSDB_CODE_QRY_IN_EXEC) {
resetTaskInfo(exec);
}
/*ASSERT(false);*/ /*ASSERT(false);*/
qError("unexpected stream execution, stream %" PRId64 " task: %d, since %s", pTask->streamId, pTask->taskId, qError("unexpected stream execution, stream %" PRId64 " task: %d, since %s", pTask->streamId, pTask->taskId,
terrstr()); terrstr());
...@@ -121,8 +129,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -121,8 +129,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
SSDataBlock* output = NULL; SSDataBlock* output = NULL;
uint64_t ts = 0; uint64_t ts = 0;
if (qExecTask(exec, &output, &ts) < 0) { if (qExecTask(exec, &output, &ts) < 0) {
taosArrayDestroy(pRes); continue;
return -1;
} }
if (output == NULL) { if (output == NULL) {
if (qStreamRecoverScanFinished(exec)) { if (qStreamRecoverScanFinished(exec)) {
......
...@@ -168,7 +168,7 @@ int32_t streamRestoreParam(SStreamTask* pTask) { ...@@ -168,7 +168,7 @@ int32_t streamRestoreParam(SStreamTask* pTask) {
return qStreamRestoreParam(exec); return qStreamRestoreParam(exec);
} }
int32_t streamSetStatusNormal(SStreamTask* pTask) { int32_t streamSetStatusNormal(SStreamTask* pTask) {
pTask->taskStatus = TASK_STATUS__NORMAL; atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册