提交 b9c6e0cd 编写于 作者: H Haojun Liao

refactor: stream invoke previous APIs.

上级 a2a1da06
...@@ -123,7 +123,8 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table ...@@ -123,7 +123,8 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
* @param handle * @param handle
* @return * @return
*/ */
int32_t qExecTask(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds); int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds);
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds);
/** /**
* kill the ongoing query and free the query handle and corresponding resources automatically * kill the ongoing query and free the query handle and corresponding resources automatically
......
...@@ -612,7 +612,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm ...@@ -612,7 +612,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm
while (1) { while (1) {
uint64_t ts; uint64_t ts;
int32_t code = qExecTask(taskInfo, pResList, &ts); int32_t code = qExecTaskOpt(taskInfo, pResList, &ts);
if (code < 0) { if (code < 0) {
smaError("vgId:%d, qExecTask for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid, smaError("vgId:%d, qExecTask for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid,
pItem->level, terrstr(code)); pItem->level, terrstr(code));
......
...@@ -80,22 +80,14 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa ...@@ -80,22 +80,14 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
} }
int32_t rowCnt = 0; int32_t rowCnt = 0;
SArray* pResList = taosArrayInit(4, POINTER_BYTES);
while (1) { while (1) {
taosArrayClear(pResList);
SSDataBlock* pDataBlock = NULL; SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0; uint64_t ts = 0;
tqDebug("task start to execute"); tqDebug("task start to execute");
if (qExecTask(task, pResList, &ts) < 0) { if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(0); ASSERT(0);
} }
tqDebug("task execute end, get %p", pDataBlock);
if (taosArrayGetSize(pResList) > 0) {
pDataBlock = taosArrayGet(pResList, 0);
tqDebug("task execute end, get %p", pDataBlock);
}
if (pDataBlock != NULL) { if (pDataBlock != NULL) {
if (pRsp->withTbName) { if (pRsp->withTbName) {
...@@ -151,7 +143,6 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa ...@@ -151,7 +143,6 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
break; break;
} }
taosArrayDestroy(pResList);
return 0; return 0;
} }
......
...@@ -427,7 +427,7 @@ static void freeBlock(void* param) { ...@@ -427,7 +427,7 @@ static void freeBlock(void* param) {
blockDataDestroy(pBlock); blockDataDestroy(pBlock);
} }
int32_t qExecTask(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) { int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int64_t threadId = taosGetSelfPthreadId(); int64_t threadId = taosGetSelfPthreadId();
...@@ -496,6 +496,63 @@ int32_t qExecTask(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) { ...@@ -496,6 +496,63 @@ int32_t qExecTask(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) {
return pTaskInfo->code; return pTaskInfo->code;
} }
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int64_t threadId = taosGetSelfPthreadId();
*pRes = NULL;
int64_t curOwner = 0;
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
return pTaskInfo->code;
}
if (pTaskInfo->cost.start == 0) {
pTaskInfo->cost.start = taosGetTimestampMs();
}
if (isTaskKilled(pTaskInfo)) {
atomic_store_64(&pTaskInfo->owner, 0);
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
return TSDB_CODE_SUCCESS;
}
// error occurs, record the error code and return to client
int32_t ret = setjmp(pTaskInfo->env);
if (ret != TSDB_CODE_SUCCESS) {
pTaskInfo->code = ret;
cleanUpUdfs();
qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
atomic_store_64(&pTaskInfo->owner, 0);
return pTaskInfo->code;
}
qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
int64_t st = taosGetTimestampUs();
*pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
uint64_t el = (taosGetTimestampUs() - st);
pTaskInfo->cost.elapsedTime += el;
if (NULL == *pRes) {
*useconds = pTaskInfo->cost.elapsedTime;
}
cleanUpUdfs();
int32_t current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
atomic_store_64(&pTaskInfo->owner, 0);
return pTaskInfo->code;
}
int32_t qKillTask(qTaskInfo_t qinfo) { int32_t qKillTask(qTaskInfo_t qinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
if (pTaskInfo == NULL) { if (pTaskInfo == NULL) {
......
...@@ -88,7 +88,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { ...@@ -88,7 +88,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
// if *taskHandle is NULL, it's killed right now // if *taskHandle is NULL, it's killed right now
if (taskHandle) { if (taskHandle) {
qwDbgSimulateSleep(); qwDbgSimulateSleep();
code = qExecTask(taskHandle, pResList, &useconds); code = qExecTaskOpt(taskHandle, pResList, &useconds);
if (code) { if (code) {
if (code != TSDB_CODE_OPS_NOT_SUPPORT) { if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
......
...@@ -43,17 +43,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* ...@@ -43,17 +43,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
} }
// exec // exec
SArray* pResList = taosArrayInit(4, POINTER_BYTES);
while (1) { while (1) {
SSDataBlock* output = NULL; SSDataBlock* output = NULL;
uint64_t ts = 0; uint64_t ts = 0;
if (qExecTask(exec, &output, &ts) < 0) {
taosArrayClear(pResList);
if (qExecTask(exec, pResList, &ts) < 0) {
ASSERT(false); ASSERT(false);
} }
if (output == NULL) {
if (taosArrayGetSize(pResList) == 0) {
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
SSDataBlock block = {0}; SSDataBlock block = {0};
const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)data; const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)data;
...@@ -69,7 +65,6 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* ...@@ -69,7 +65,6 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
break; break;
} }
output = taosArrayGetP(pResList, 0);
if (output->info.type == STREAM_RETRIEVE) { if (output->info.type == STREAM_RETRIEVE) {
if (streamBroadcastToChildren(pTask, output) < 0) { if (streamBroadcastToChildren(pTask, output) < 0) {
// TODO // TODO
...@@ -84,8 +79,6 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* ...@@ -84,8 +79,6 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
block.info.childId = pTask->selfChildId; block.info.childId = pTask->selfChildId;
taosArrayPush(pRes, &block); taosArrayPush(pRes, &block);
} }
taosArrayDestroy(pResList);
return 0; return 0;
} }
...@@ -105,7 +98,6 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) { ...@@ -105,7 +98,6 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
void* exec = pTask->exec.executor; void* exec = pTask->exec.executor;
SArray* pResList = taosArrayInit(4, POINTER_BYTES);
while (1) { while (1) {
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (pRes == NULL) { if (pRes == NULL) {
...@@ -115,17 +107,14 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) { ...@@ -115,17 +107,14 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
int32_t batchCnt = 0; int32_t batchCnt = 0;
while (1) { while (1) {
uint64_t ts = 0; SSDataBlock* output = NULL;
taosArrayClear(pResList); uint64_t ts = 0;
if (qExecTask(exec, pResList, &ts) < 0) { if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(0); ASSERT(0);
} }
if (output == NULL) break;
if (taosArrayGetSize(pResList) == 0) break;
SSDataBlock block = {0}; SSDataBlock block = {0};
SSDataBlock* output = taosArrayGetP(pResList, 0);
assignOneDataBlock(&block, output); assignOneDataBlock(&block, output);
block.info.childId = pTask->selfChildId; block.info.childId = pTask->selfChildId;
taosArrayPush(pRes, &block); taosArrayPush(pRes, &block);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册