diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 411b9b9470cbffcbbe3d47a5f9a142b0a97f3699..bc26abe0a432814f6499e943aeaa2ce6b4f42476 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -31,10 +31,6 @@ extern "C" { #include "tsqlfunction.h" #include "tutil.h" -//#define TSC_GET_RESPTR_BASE(res, _queryinfo, col, ord) \ -// ((res->data + tscFieldInfoGetOffset(_queryinfo, col) * res->numOfRows) + \ -// (1 - ord.order) * (res->numOfRows - 1) * tscFieldInfoGetField(_queryinfo, col)->bytes) - #define TSC_GET_RESPTR_BASE(res, _queryinfo, col, ord) \ (res->data + tscFieldInfoGetOffset(_queryinfo, col) * res->numOfRows) @@ -117,13 +113,6 @@ typedef struct SColumnBaseInfo { struct SLocalReducer; -// todo move to utility -typedef struct SString { - int32_t alloc; - int32_t n; - char * z; -} SString; - typedef struct SCond { uint64_t uid; char * cond; diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 32ddb6aee676016b3efb377f89dd64326dba9e8e..34b666604714ad72672e9a3e4ada638e6877123a 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -2890,24 +2890,17 @@ static FORCE_INLINE void date_col_output_function_f(SQLFunctionCtx *pCtx, int32_ static void col_project_function(SQLFunctionCtx *pCtx) { INC_INIT_VAL(pCtx, pCtx->size); - char *pDest = 0; -// if (pCtx->order == TSQL_SO_ASC) { - pDest = pCtx->aOutputBuf; -// } else { -// pDest = pCtx->aOutputBuf - (pCtx->size - 1) * pCtx->inputBytes; -// } - char *pData = GET_INPUT_CHAR(pCtx); if (pCtx->order == TSQL_SO_ASC) { - memcpy(pDest, pData, (size_t)pCtx->size * pCtx->inputBytes); + memcpy(pCtx->aOutputBuf, pData, (size_t)pCtx->size * pCtx->inputBytes); } else { for(int32_t i = 0; i < pCtx->size; ++i) { - memcpy(pDest + (pCtx->size - 1 - i) * pCtx->inputBytes, pData + i * pCtx->inputBytes, + memcpy(pCtx->aOutputBuf + (pCtx->size - 1 - i) * pCtx->inputBytes, pData + i * pCtx->inputBytes, pCtx->inputBytes); } } - pCtx->aOutputBuf += pCtx->size * pCtx->outputBytes/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/; + pCtx->aOutputBuf += pCtx->size * pCtx->outputBytes; } static void col_project_function_f(SQLFunctionCtx *pCtx, int32_t index) { @@ -3229,7 +3222,7 @@ static void diff_function_f(SQLFunctionCtx *pCtx, int32_t index) { GET_RES_INFO(pCtx)->numOfRes += 1; } - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); + int32_t step = 1/*GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/; switch (pCtx->inputType) { case TSDB_DATA_TYPE_INT: { diff --git a/src/inc/hashutil.h b/src/inc/hashutil.h index e4c044abb088b320c52a38ee22cf4d25f001d7f4..047f1889d78d6f8559bd0e320a0e9bae2beaa681 100644 --- a/src/inc/hashutil.h +++ b/src/inc/hashutil.h @@ -37,7 +37,6 @@ uint32_t MurmurHash3_32(const char *key, uint32_t len); */ uint32_t taosIntHash_32(const char *key, uint32_t len); - uint32_t taosIntHash_64(const char *key, uint32_t len); _hash_fn_t taosGetDefaultHashFunction(int32_t type); diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 4b405bdf6b806cfdf47183b349a3295efa0478c9..b0994c02da4d90eb74ce1dc10014d173230a18bb 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -1485,9 +1485,6 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t } TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pRuntimeEnv->intervalSKey : pRuntimeEnv->intervalEKey; - - // int64_t alignedTimestamp = - // taosGetIntervalStartTimestamp(ts, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, pQuery->precision); setExecParams(pQuery, &pCtx[k], ts, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, tpField, hasNull, pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag); } @@ -1980,7 +1977,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * SWindowStatus* pStatus = getCurrentSWindow(&pRuntimeEnv->swindowResInfo); if (!IS_MASTER_SCAN(pRuntimeEnv) && !pStatus->closed) { - qTrace("not completed in supplementary scan, ignore\n"); +// qTrace("QInfo:%p not completed in supplementary scan, ignore funcId:%d, window:%lld-%lld", +// GET_QINFO_ADDR(pQuery), functionId, pStatus->window.skey, pStatus->window.ekey); continue; } @@ -2012,7 +2010,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * SWindowStatus* pStatus = getCurrentSWindow(&pRuntimeEnv->swindowResInfo); if (!IS_MASTER_SCAN(pRuntimeEnv) && !pStatus->closed) { - qTrace("not completed in supplementary scan, ignore"); +// qTrace("QInfo:%p not completed in supplementary scan, ignore funcId:%d, window:%lld-%lld", +// GET_QINFO_ADDR(pQuery), functionId, pStatus->window.skey, pStatus->window.ekey); continue; } @@ -2068,7 +2067,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * free(sasArray); - if (pQuery->slidingTime > 0 && pQuery->nAggTimeInterval > 0) { + if (pQuery->slidingTime > 0 && pQuery->nAggTimeInterval > 0 && IS_MASTER_SCAN(pRuntimeEnv)) { SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo; // query completed @@ -2099,9 +2098,12 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * pWindowResInfo->prevSKey = skey; // the number of completed slots are larger than the threshold, dump to client immediately. - if (numOfResFromResWindowInfo(pWindowResInfo) > pWindowResInfo->threshold) { + int32_t v = numOfResFromResWindowInfo(pWindowResInfo); + if (v > pWindowResInfo->threshold) { setQueryStatus(pQuery, QUERY_RESBUF_FULL); } + + dTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pWindowResInfo->size, v); } } @@ -2110,7 +2112,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * * because the results of group by normal column is put into intermediate buffer. */ int32_t num = 0; - if (!groupbyStateValue) { + if (!groupbyStateValue && !(pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) { num = getNumOfResult(pRuntimeEnv) - prevNumOfRes; } @@ -4500,7 +4502,8 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) } if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags; - initResWindowInfo(&pRuntimeEnv->swindowResInfo, 10039, TSDB_DATA_TYPE_BIGINT, pSupporter->pResult); + int16_t type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); + initResWindowInfo(&pRuntimeEnv->swindowResInfo, 10039, type, pSupporter->pResult); } if (pQuery->nAggTimeInterval != 0) { @@ -4917,10 +4920,10 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { while (1) { // check if query is killed or not set the status of query to pass the status check -// if (isQueryKilled(pQuery)) { -// setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); -// return cnt; -// } + if (isQueryKilled(pQuery)) { + setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); + return cnt; + } int32_t numOfRes = 0; SBlockInfo blockInfo = {0}; @@ -5745,7 +5748,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) { SQuery *pQuery = pRuntimeEnv->pQuery; - int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); +// int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); // reset the execution contexts for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { @@ -5765,7 +5768,7 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) { * * diff function is handled in multi-output function */ - pRuntimeEnv->pCtx[j].ptsOutputBuf += TSDB_KEYSIZE * output * factor; + pRuntimeEnv->pCtx[j].ptsOutputBuf += TSDB_KEYSIZE * output/* * factor*/; } resetResultInfo(pRuntimeEnv->pCtx[j].resultInfo); @@ -5856,12 +5859,15 @@ static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus SWAP(pQuery->skey, pQuery->ekey, TSKEY); pQuery->lastKey = pQuery->skey; pRuntimeEnv->startPos = pRuntimeEnv->endPos; + + SWAP(pRuntimeEnv->intervalSKey, pRuntimeEnv->intervalEKey, TSKEY); } static void queryStatusRestore(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) { SQuery *pQuery = pRuntimeEnv->pQuery; SWAP(pQuery->skey, pQuery->ekey, TSKEY); - + SWAP(pRuntimeEnv->intervalSKey, pRuntimeEnv->intervalEKey, TSKEY); + pQuery->lastKey = pStatus->lastKey; pQuery->skey = pStatus->skey; pQuery->ekey = pStatus->ekey; @@ -5883,6 +5889,8 @@ static void doSingleMeterSupplementScan(SQueryRuntimeEnv *pRuntimeEnv) { return; } + dTrace("QInfo:%p start to supp scan", GET_QINFO_ADDR(pQuery)); + SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv); // usually this load operation will incur load disk block operation @@ -5923,7 +5931,8 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { /* store the start query position */ savePointPosition(&pRuntimeEnv->startPos, pQuery->fileId, pQuery->slot, pQuery->pos); - + int64_t skey = pQuery->lastKey; + while (1) { doScanAllDataBlocks(pRuntimeEnv); @@ -5978,26 +5987,14 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { } } + int64_t newSkey = pQuery->skey; + pQuery->skey = skey; + doSingleMeterSupplementScan(pRuntimeEnv); - - // reset status code - if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) { - SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo; - - for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SOutputRes *buf = &pWindowResInfo->pResult[i]; - for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { - buf->resultInfo[j].complete = false; - } - } - } else { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[i]); - if (pResInfo != NULL) { - pResInfo->complete = false; - } - } - } + + // update the pQuery->skey/pQuery->ekey to limit the scan scope of sliding query during + // supplementary scan + pQuery->skey = newSkey; } void doFinalizeResult(SQueryRuntimeEnv *pRuntimeEnv) { @@ -6006,10 +6003,15 @@ void doFinalizeResult(SQueryRuntimeEnv *pRuntimeEnv) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) { // for each group result, call the finalize function for each column SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo; - + bool groupbyCol = isGroupbyNormalCol(pQuery->pGroupbyExpr); + for (int32_t i = 0; i < pWindowResInfo->size; ++i) { SOutputRes *buf = &pWindowResInfo->pResult[i]; SWindowStatus* pStatus = &pWindowResInfo->pStatus[i]; + if (groupbyCol) { + pStatus->closed = true; + } + if (!pStatus->closed) { continue; } diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index 5089b1b79a70de814cc8bddce3ccc6d02265d677..73cb456f0b0f11ca4b7b0ee5127b7fc49abb01e3 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -85,6 +85,29 @@ static void setStartPositionForCacheBlock(SQuery *pQuery, SCacheBlock *pBlock, b } } +static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) { + SQuery* pQuery = pRuntimeEnv->pQuery; + + // enable execution for next table + if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) { + SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo; + + for (int32_t i = 0; i < pWindowResInfo->size; ++i) { + SOutputRes *buf = &pWindowResInfo->pResult[i]; + for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { + buf->resultInfo[j].complete = false; + } + } + } else { + for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[i]); + if (pResInfo != NULL) { + pResInfo->complete = false; + } + } + } +} + static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) { SQuery * pQuery = &pQInfo->query; SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; @@ -548,7 +571,10 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start pointInterpSupporterDestroy(&pointInterpSupporter); vnodeScanAllData(pRuntimeEnv); - + + // enable execution for next table + enableExecutionForNextTable(pRuntimeEnv); + // first/last_row query, do not invoke the finalize for super table query if (!isFirstLastRowQuery(pQuery)) { doFinalizeResult(pRuntimeEnv); @@ -725,7 +751,10 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { pSupporter->meterIdx = pSupporter->pSidSet->numOfSids; break; } - + + // enable execution for next table + enableExecutionForNextTable(pRuntimeEnv); + if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { /* * query range is identical in terms of all meters involved in query, @@ -738,10 +767,10 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { pSupporter->meterIdx++; // if the buffer is full or group by each table, we need to jump out of the loop -// if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL) || -// isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)) { + if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL) || + isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)) { break; -// } + } } else { // forward query range @@ -768,11 +797,14 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { pRuntimeEnv->cur = pRuntimeEnv->pTSBuf->cur; } + // todo refactor if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { SSlidingWindowResInfo* pWindowResInfo = &pRuntimeEnv->swindowResInfo; for (int32_t i = 0; i < pWindowResInfo->size; ++i) { SOutputRes *buf = &pWindowResInfo->pResult[i]; + pWindowResInfo->pStatus[i].closed = true; // enable return all results for group by normal columns + for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { buf->numOfRows = MAX(buf->numOfRows, buf->resultInfo[j].numOfRes); } diff --git a/src/util/src/thashutil.c b/src/util/src/thashutil.c index 94961e356e66fea01b35cee9d5ea90e2dbd794ce..cf16efe2f8e539f9611952111bafc5d4ff214d3e 100644 --- a/src/util/src/thashutil.c +++ b/src/util/src/thashutil.c @@ -103,4 +103,4 @@ _hash_fn_t taosGetDefaultHashFunction(int32_t type) { } return fn; - } \ No newline at end of file +} \ No newline at end of file