From 9031c50ca4cc9b77b04cc1dc59da1f0cd30e203f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 19 Aug 2020 01:22:36 +0800 Subject: [PATCH] [td-1103] fix bugs --- src/common/src/tvariant.c | 12 +++++++++++- src/query/inc/qExecutor.h | 2 +- src/query/inc/qUtil.h | 4 ++-- src/query/inc/tsqlfunction.h | 10 +++++----- src/query/src/qExecutor.c | 22 +++++++++++++++------- src/query/src/qTsbuf.c | 13 +++++++------ 6 files changed, 41 insertions(+), 22 deletions(-) diff --git a/src/common/src/tvariant.c b/src/common/src/tvariant.c index 6716a1827e..6e8111aa72 100644 --- a/src/common/src/tvariant.c +++ b/src/common/src/tvariant.c @@ -175,7 +175,17 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) { } int32_t tVariantCompare(const tVariant* p1, const tVariant* p2) { - assert((p1->nType != TSDB_DATA_TYPE_NULL) || (p2->nType != TSDB_DATA_TYPE_NULL)); + if (p1->nType == TSDB_DATA_TYPE_NULL && p2->nType == TSDB_DATA_TYPE_NULL) { + return 0; + } + + if (p1->nType == TSDB_DATA_TYPE_NULL) { + return -1; + } + + if (p2->nType == TSDB_DATA_TYPE_NULL) { + return 1; + } switch (p1->nType) { case TSDB_DATA_TYPE_BINARY: diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 320ae8d137..5126dd347e 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -118,7 +118,7 @@ typedef struct SQueryCostInfo { uint32_t loadBlockStatis; uint32_t discardBlocks; uint64_t elapsedTime; - uint64_t computTime; + uint64_t firstStageMergeTime; uint64_t internalSupSize; uint64_t numOfTimeWindows; } SQueryCostInfo; diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index 98b0ed0011..6de3c7c0e5 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -38,8 +38,8 @@ static FORCE_INLINE SWindowResult *getWindowResult(SWindowResInfo *pWindowResInf return &pWindowResInfo->pResult[slot]; } -#define curTimeWindowIndex(_winres) ((_winres)->curIndex) -#define GET_TIMEWINDOW(_winresInfo, _win) (STimeWindow) {(_win)->skey, ((_win)->skey + (_winresInfo)->interval)} +#define curTimeWindowIndex(_winres) ((_winres)->curIndex) +#define GET_TIMEWINDOW(_winresInfo, _win) (STimeWindow) {(_win)->skey, ((_win)->skey + (_winresInfo)->interval - 1)} #define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pSelectExpr[1].base.arg->argValue.i64:1) bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot); diff --git a/src/query/inc/tsqlfunction.h b/src/query/inc/tsqlfunction.h index 1dafe15989..a443a43a23 100644 --- a/src/query/inc/tsqlfunction.h +++ b/src/query/inc/tsqlfunction.h @@ -138,11 +138,11 @@ typedef struct SInterpInfoDetail { typedef struct SResultInfo { int8_t hasResult; // result generated, not NULL value - bool initialized:1; // output buffer has been initialized - bool complete:1; // query has completed - bool superTableQ:1; // is super table query - int16_t numOfRes; // num of output result in current buffer - uint32_t bufLen; // buffer size + bool initialized; // output buffer has been initialized + bool complete; // query has completed + bool superTableQ; // is super table query + uint32_t bufLen; // buffer size + uint64_t numOfRes; // num of output result in current buffer void* interResultBuf; // output result buffer } SResultInfo; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 67464c36eb..c98bac96ba 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -970,7 +970,6 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * int32_t index = pWindowResInfo->curIndex; STimeWindow nextWin = win; - assert(tsCols != NULL); while (1) { int32_t prevEndPos = (forwardStep - 1) * step + startPos; @@ -2668,7 +2667,7 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) } int32_t mergeIntoGroupResult(SQInfo *pQInfo) { - int64_t st = taosGetTimestampMs(); + int64_t st = taosGetTimestampUs(); int32_t ret = TSDB_CODE_SUCCESS; int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo)); @@ -2695,9 +2694,11 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) { SET_STABLE_QUERY_OVER(pQInfo); } - qDebug("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "ms", pQInfo, - pQInfo->groupIndex - 1, numOfGroups, taosGetTimestampMs() - st); + int64_t elapsedTime = taosGetTimestampUs() - st; + qDebug("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", pQInfo, + pQInfo->groupIndex - 1, numOfGroups, elapsedTime); + pQInfo->runtimeEnv.summary.firstStageMergeTime += elapsedTime; return TSDB_CODE_SUCCESS; } @@ -2830,6 +2831,13 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { while (1) { if (IS_QUERY_KILLED(pQInfo)) { qDebug("QInfo:%p it is already killed, abort", pQInfo); + + taosTFree(pTableList); + taosTFree(posList); + taosTFree(pTree); + taosTFree(pResultInfo); + taosTFree(buf); + longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -3996,9 +4004,9 @@ static void queryCostStatis(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryCostInfo *pSummary = &pRuntimeEnv->summary; - qDebug("QInfo:%p :cost summary: elapsed time:%"PRId64" us, total blocks:%d, load block statis:%d," - " load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64, - pQInfo, pSummary->elapsedTime, pSummary->totalBlocks, pSummary->loadBlockStatis, + qDebug("QInfo:%p :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, " + "load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64, + pQInfo, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis, pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows); qDebug("QInfo:%p :cost summary: internal size:%"PRId64", numOfWin:%"PRId64, pQInfo, pSummary->internalSupSize, diff --git a/src/query/src/qTsbuf.c b/src/query/src/qTsbuf.c index a6fc3da05f..518bb4083b 100644 --- a/src/query/src/qTsbuf.c +++ b/src/query/src/qTsbuf.c @@ -232,7 +232,7 @@ static void writeDataToDisk(STSBuf* pTSBuf) { TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); int64_t r = fseek(pTSBuf->f, pTSBuf->fileSize, SEEK_SET); - UNUSED(r); + assert(r == 0); /* * format for output data: @@ -241,13 +241,14 @@ static void writeDataToDisk(STSBuf* pTSBuf) { * * both side has the compressed length is used to support load data forwards/backwords. */ - fwrite(&pBlock->tag.nType, sizeof(pBlock->tag.nType), 1, pTSBuf->f); - fwrite(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f); + int32_t metaLen = 0; + metaLen += fwrite(&pBlock->tag.nType, 1, sizeof(pBlock->tag.nType), pTSBuf->f); + metaLen += fwrite(&pBlock->tag.nLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f); if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) { - fwrite(pBlock->tag.pz, (size_t)pBlock->tag.nLen, 1, pTSBuf->f); + metaLen += fwrite(pBlock->tag.pz, 1, (size_t)pBlock->tag.nLen, pTSBuf->f); } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) { - fwrite(&pBlock->tag.i64Key, sizeof(int64_t), 1, pTSBuf->f); + metaLen += fwrite(&pBlock->tag.i64Key, 1, sizeof(int64_t), pTSBuf->f); } fwrite(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f); @@ -255,7 +256,7 @@ static void writeDataToDisk(STSBuf* pTSBuf) { fwrite(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f); fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); - int32_t blockSize = sizeof(pBlock->tag) + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen; + int32_t blockSize = metaLen + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen; pTSBuf->fileSize += blockSize; pTSBuf->tsData.len = 0; -- GitLab