未验证 提交 71e39931 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #17114 from taosdata/feature/3_liaohj

refactor(query): do some internal refactor.
......@@ -136,7 +136,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
* @param handle
* @return
*/
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch *pLocal);
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch *pLocal);
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds);
/**
......
......@@ -70,11 +70,10 @@ static void deregisterRequest(SRequestObj *pRequest) {
int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
int64_t nowUs = taosGetTimestampUs();
int64_t duration = nowUs - pRequest->metric.start;
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64
" ms, current:%d, app current:%d",
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst);
int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%.2f ms, "
"current:%d, app current:%d",
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->stmtType) {
tscPerf("insert duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64
......@@ -85,11 +84,12 @@ static void deregisterRequest(SRequestObj *pRequest) {
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64
"us, planner:%" PRId64 "us, exec:%" PRId64 "us",
"us, planner:%" PRId64 "us, exec:%" PRId64 "us, reqId:0x%"PRIx64,
duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd - pRequest->metric.ctgEnd,
pRequest->metric.planEnd - pRequest->metric.semanticEnd,
pRequest->metric.resultReady - pRequest->metric.planEnd);
pRequest->metric.resultReady - pRequest->metric.planEnd, pRequest->requestId);
atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
}
......
......@@ -815,7 +815,7 @@ int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
if (NULL == pRequest->body.resInfo.execRes.res) {
return TSDB_CODE_SUCCESS;
return pRequest->code;
}
SCatalog* pCatalog = NULL;
......@@ -868,10 +868,12 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
return code;
}
//todo refacto the error code mgmt
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
SRequestObj* pRequest = (SRequestObj*)param;
pRequest->code = code;
STscObj* pTscObj = pRequest->pTscObj;
pRequest->code = code;
pRequest->metric.resultReady = taosGetTimestampUs();
if (pResult) {
......@@ -879,31 +881,28 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
}
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
TDMT_VND_CREATE_TABLE == pRequest->type) {
int32_t type = pRequest->type;
if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) {
if (pResult) {
pRequest->body.resInfo.numOfRows = pResult->numOfRows;
if (TDMT_VND_SUBMIT == pRequest->type) {
STscObj* pTscObj = pRequest->pTscObj;
// record the insert rows
if (TDMT_VND_SUBMIT == type) {
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
}
}
schedulerFreeJob(&pRequest->body.queryJob, 0);
pRequest->metric.execEnd = taosGetTimestampUs();
}
taosMemoryFree(pResult);
tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId);
tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code,
tstrerror(code), pRequest->requestId);
STscObj* pTscObj = pRequest->pTscObj;
if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) {
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%s, tryCount:%d, reqId:0x%" PRIx64,
pRequest->self, tstrerror(code), pRequest->retry, pRequest->requestId);
pRequest->prevCode = code;
schedulerFreeJob(&pRequest->body.queryJob, 0);
qDestroyQuery(pRequest->pQuery);
......@@ -917,7 +916,11 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
removeMeta(pTscObj, pRequest->targetTableList);
}
handleQueryExecRsp(pRequest);
pRequest->metric.execEnd = taosGetTimestampUs();
int32_t code1 = handleQueryExecRsp(pRequest);
if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) {
pRequest->code = code1;
}
// return to client
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
......
......@@ -700,6 +700,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
SQuery *pQuery = pRequest->pQuery;
pRequest->metric.ctgEnd = taosGetTimestampUs();
qDebug("0x%" PRIx64 " start to semantic analysis, reqId:0x%" PRIx64, pRequest->self, pRequest->requestId);
if (code == TSDB_CODE_SUCCESS) {
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
......@@ -723,13 +724,16 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
destorySqlParseWrapper(pWrapper);
tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, reqId:0x%" PRIx64, pRequest->self,
pRequest->requestId);
double el = (pRequest->metric.semanticEnd - pRequest->metric.ctgEnd)/1000.0;
tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, elapsed time:%.2f ms, reqId:0x%" PRIx64,
pRequest->self, el, pRequest->requestId);
launchAsyncQuery(pRequest, pQuery, pResultMeta);
} else {
destorySqlParseWrapper(pWrapper);
qDestroyQuery(pRequest->pQuery);
pRequest->pQuery = NULL;
if (NEED_CLIENT_HANDLE_ERROR(code)) {
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
......
......@@ -673,8 +673,8 @@ typedef struct {
} SSkmInfo;
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pBlockLoadInfo, STSchema *pSchema,
int16_t *pCols, int32_t numOfCols, const char *idStr);
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree);
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
......
......@@ -691,7 +691,8 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
while (1) {
uint64_t ts;
int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, NULL);
bool hasMore = false;
int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL);
if (code < 0) {
if (code == TSDB_CODE_QRY_IN_EXEC) {
break;
......
......@@ -420,6 +420,7 @@ typedef enum {
typedef struct {
SFSLASTNEXTROWSTATES state; // [input]
STsdb *pTsdb; // [input]
STSchema *pTSchema;// [input]
tb_uid_t suid;
tb_uid_t uid;
int32_t nFileSet;
......@@ -455,9 +456,10 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet);
if (code) goto _err;
SSttBlockLoadInfo* pLoadInfo = tCreateLastBlockLoadInfo(state->pTSchema, NULL, 0);
tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid,
&(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, NULL, NULL, NULL, 0, NULL);
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, pLoadInfo,true, NULL);
bool hasVal = tMergeTreeNext(&state->mergeTree);
if (!hasVal) {
state->state = SFSLASTNEXTROW_FILESET;
......@@ -892,6 +894,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS;
pIter->fsLastState.pTsdb = pTsdb;
pIter->fsLastState.aDFileSet = pIter->pReadSnap->fs.aDFileSet;
pIter->fsLastState.pTSchema = pTSchema;
pIter->fsLastState.suid = suid;
pIter->fsLastState.uid = uid;
......
......@@ -474,8 +474,8 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
}
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pBlockLoadInfo, STSchema* pSchema,
int16_t* pCols, int32_t numOfCols, const char* idStr) {
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr) {
pMTree->backward = backward;
pMTree->pIter = NULL;
pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
......@@ -488,22 +488,12 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
int32_t code = TSDB_CODE_SUCCESS;
SSttBlockLoadInfo* pLoadInfo = NULL;
if (pBlockLoadInfo == NULL) {
ASSERT(0);
if (pMTree->pLoadInfo == NULL) {
pMTree->destroyLoadInfo = true;
pMTree->pLoadInfo = tCreateLastBlockLoadInfo(pSchema, pCols, numOfCols);
}
pLoadInfo = pMTree->pLoadInfo;
} else {
pLoadInfo = pBlockLoadInfo;
}
pMTree->pLoadInfo = pBlockLoadInfo;
pMTree->destroyLoadInfo = destroyLoadInfo;
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
struct SLDataIter* pIter = NULL;
code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, &pLoadInfo[i]);
code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, &pMTree->pLoadInfo[i]);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
......
......@@ -951,15 +951,22 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
return TSDB_CODE_SUCCESS;
}
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData) {
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData, uint64_t uid) {
int64_t st = taosGetTimestampUs();
tBlockDataReset(pBlockData);
TABLEID tid = {.suid = pReader->suid, .uid = uid};
int32_t code = tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols-1);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
ASSERT(pBlockInfo != NULL);
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
int32_t code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, code:%s %s",
......@@ -1998,8 +2005,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
int32_t code =
tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader,
pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo,
pReader->pSchema, pReader->suppInfo.colIds, pReader->suppInfo.numOfCols, pReader->idStr);
pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo, false, pReader->idStr);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
......@@ -2455,14 +2461,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
ASSERT(pBlockIter->numOfBlocks == 0);
code = buildComposedDataBlock(pReader);
} else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
tBlockDataReset(&pStatus->fileBlockData);
TABLEID tid = {.suid = pReader->suid, .uid = pScanInfo->uid};
code = tBlockDataInit(&pStatus->fileBlockData, &tid, pReader->pSchema, NULL, 0);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2936,14 +2935,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
// 3. load the neighbor block, and set it to be the currently accessed file data block
tBlockDataReset(&pStatus->fileBlockData);
TABLEID tid = {.suid = pReader->suid, .uid = pFBlock->uid};
int32_t code = tBlockDataInit(&pStatus->fileBlockData, &tid, pReader->pSchema, NULL, 0);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pFBlock->uid);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -3701,15 +3693,7 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pStatus->blockIter);
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
tBlockDataReset(&pStatus->fileBlockData);
TABLEID tid = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
int32_t code = tBlockDataInit(&pStatus->fileBlockData, &tid, pReader->pSchema, NULL, 0);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData);
int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
if (code != TSDB_CODE_SUCCESS) {
tBlockDataDestroy(&pStatus->fileBlockData, 1);
terrno = code;
......
......@@ -479,7 +479,7 @@ static void freeBlock(void* param) {
blockDataDestroy(pBlock);
}
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch* pLocal) {
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int64_t threadId = taosGetSelfPthreadId();
......@@ -536,6 +536,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SL
}
}
*hasMore = (pRes != NULL);
uint64_t el = (taosGetTimestampUs() - st);
pTaskInfo->cost.elapsedTime += el;
......
......@@ -107,9 +107,12 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
// if *taskHandle is NULL, it's killed right now
bool hasMore = false;
if (taskHandle) {
qwDbgSimulateSleep();
code = qExecTaskOpt(taskHandle, pResList, &useconds, &localFetch);
code = qExecTaskOpt(taskHandle, pResList, &useconds, &hasMore, &localFetch);
if (code) {
if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
......@@ -122,20 +125,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
++execNum;
if (taosArrayGetSize(pResList) == 0) {
QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds);
dsEndPut(sinkHandle, useconds);
QW_ERR_JRET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
if (queryStop) {
*queryStop = true;
}
break;
}
for (int32_t j = 0; j < taosArrayGetSize(pResList); ++j) {
size_t numOfResBlock = taosArrayGetSize(pResList);
for (int32_t j = 0; j < numOfResBlock; ++j) {
SSDataBlock *pRes = taosArrayGetP(pResList, j);
ASSERT(pRes->info.rows > 0);
......@@ -149,6 +140,23 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue);
}
if (numOfResBlock == 0 || (hasMore == false)) {
if (numOfResBlock == 0) {
QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds);
} else {
QW_TASK_DLOG("qExecTask done", "");
}
dsEndPut(sinkHandle, useconds);
QW_ERR_JRET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
if (queryStop) {
*queryStop = true;
}
break;
}
if (!qcontinue) {
if (queryStop) {
*queryStop = true;
......
......@@ -47,8 +47,7 @@ void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
return;
_return:
SCH_JOB_DLOG("job errCode updated to %x - %s", errCode, tstrerror(errCode));
SCH_JOB_DLOG("job errCode updated to %s", tstrerror(errCode));
}
bool schJobDone(SSchJob *pJob) {
......@@ -491,7 +490,7 @@ int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
int32_t code = atomic_load_32(&pJob->errCode);
if (code) {
SCH_JOB_DLOG("job failed with error: %s", tstrerror(code));
SCH_JOB_DLOG("job failed with error %s", tstrerror(code));
}
schPostJobRes(pJob, 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册