提交 4afd80c5 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

Merge branch 'master' of https://github.com/taosdata/TDengine

...@@ -279,8 +279,9 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -279,8 +279,9 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pRes->pLocalReducer = pReducer; pRes->pLocalReducer = pReducer;
pRes->numOfGroups = 0; pRes->numOfGroups = 0;
int16_t prec = pCmd->pMeterMeta->precision;
int64_t stime = (pCmd->stime < pCmd->etime) ? pCmd->stime : pCmd->etime; int64_t stime = (pCmd->stime < pCmd->etime) ? pCmd->stime : pCmd->etime;
int64_t revisedSTime = taosGetIntervalStartTimestamp(stime, pCmd->nAggTimeInterval, pCmd->intervalTimeUnit); int64_t revisedSTime = taosGetIntervalStartTimestamp(stime, pCmd->nAggTimeInterval, pCmd->intervalTimeUnit, prec);
SInterpolationInfo *pInterpoInfo = &pReducer->interpolationInfo; SInterpolationInfo *pInterpoInfo = &pReducer->interpolationInfo;
taosInitInterpoInfo(pInterpoInfo, pCmd->order.order, revisedSTime, pCmd->groupbyExpr.numOfGroupbyCols, taosInitInterpoInfo(pInterpoInfo, pCmd->order.order, revisedSTime, pCmd->groupbyExpr.numOfGroupbyCols,
...@@ -664,12 +665,11 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource * ...@@ -664,12 +665,11 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource *
} }
} }
void savePrevRecordAndSetupInterpoInfo( void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SSqlCmd *pCmd, SInterpolationInfo *pInterpoInfo) {
SLocalReducer *pLocalReducer, SSqlCmd *pCmd, // discard following dataset in the same group and reset the interpolation information
SInterpolationInfo int16_t prec = pCmd->pMeterMeta->precision;
*pInterpoInfo) { // discard following dataset in the same group and reset the interpolation information
int64_t stime = (pCmd->stime < pCmd->etime) ? pCmd->stime : pCmd->etime; int64_t stime = (pCmd->stime < pCmd->etime) ? pCmd->stime : pCmd->etime;
int64_t revisedSTime = taosGetIntervalStartTimestamp(stime, pCmd->nAggTimeInterval, pCmd->intervalTimeUnit); int64_t revisedSTime = taosGetIntervalStartTimestamp(stime, pCmd->nAggTimeInterval, pCmd->intervalTimeUnit, prec);
taosInitInterpoInfo(pInterpoInfo, pCmd->order.order, revisedSTime, pCmd->groupbyExpr.numOfGroupbyCols, taosInitInterpoInfo(pInterpoInfo, pCmd->order.order, revisedSTime, pCmd->groupbyExpr.numOfGroupbyCols,
pLocalReducer->rowSize); pLocalReducer->rowSize);
...@@ -811,9 +811,12 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo ...@@ -811,9 +811,12 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
functions[i] = tscSqlExprGet(pCmd, i)->sqlFuncId; functions[i] = tscSqlExprGet(pCmd, i)->sqlFuncId;
} }
int8_t precision = pCmd->pMeterMeta->precision;
while (1) { while (1) {
int32_t remains = taosNumOfRemainPoints(pInterpoInfo); int32_t remains = taosNumOfRemainPoints(pInterpoInfo);
TSKEY etime = taosGetRevisedEndKey(actualETime, pCmd->order.order, pCmd->nAggTimeInterval, pCmd->intervalTimeUnit); TSKEY etime = taosGetRevisedEndKey(actualETime, pCmd->order.order, pCmd->nAggTimeInterval, pCmd->intervalTimeUnit,
precision);
int32_t nrows = taosGetNumOfResultWithInterpo(pInterpoInfo, pPrimaryKeys, remains, pCmd->nAggTimeInterval, etime, int32_t nrows = taosGetNumOfResultWithInterpo(pInterpoInfo, pPrimaryKeys, remains, pCmd->nAggTimeInterval, etime,
pLocalReducer->resColModel->maxCapacity); pLocalReducer->resColModel->maxCapacity);
...@@ -1115,10 +1118,12 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer ...@@ -1115,10 +1118,12 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer
pRes->numOfTotal = 0; pRes->numOfTotal = 0;
pCmd->limit.offset = pLocalReducer->offset; pCmd->limit.offset = pLocalReducer->offset;
int16_t precision = pCmd->pMeterMeta->precision;
if (pCmd->interpoType != TSDB_INTERPO_NONE) { if (pCmd->interpoType != TSDB_INTERPO_NONE) {
/* for group result interpolation, do not return if not data is generated */ /* for group result interpolation, do not return if not data is generated */
int64_t stime = (pCmd->stime < pCmd->etime) ? pCmd->stime : pCmd->etime; int64_t stime = (pCmd->stime < pCmd->etime) ? pCmd->stime : pCmd->etime;
int64_t newTime = taosGetIntervalStartTimestamp(stime, pCmd->nAggTimeInterval, pCmd->intervalTimeUnit); int64_t newTime = taosGetIntervalStartTimestamp(stime, pCmd->nAggTimeInterval, pCmd->intervalTimeUnit, precision);
taosInitInterpoInfo(&pLocalReducer->interpolationInfo, pCmd->order.order, newTime, taosInitInterpoInfo(&pLocalReducer->interpolationInfo, pCmd->order.order, newTime,
pCmd->groupbyExpr.numOfGroupbyCols, pLocalReducer->rowSize); pCmd->groupbyExpr.numOfGroupbyCols, pLocalReducer->rowSize);
...@@ -1135,6 +1140,7 @@ static bool doInterpolationForCurrentGroup(SSqlObj *pSql) { ...@@ -1135,6 +1140,7 @@ static bool doInterpolationForCurrentGroup(SSqlObj *pSql) {
SLocalReducer * pLocalReducer = pRes->pLocalReducer; SLocalReducer * pLocalReducer = pRes->pLocalReducer;
SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo; SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo;
int8_t p = pCmd->pMeterMeta->precision;
if (taosHasRemainsDataForInterpolation(pInterpoInfo)) { if (taosHasRemainsDataForInterpolation(pInterpoInfo)) {
assert(pCmd->interpoType != TSDB_INTERPO_NONE); assert(pCmd->interpoType != TSDB_INTERPO_NONE);
...@@ -1143,7 +1149,7 @@ static bool doInterpolationForCurrentGroup(SSqlObj *pSql) { ...@@ -1143,7 +1149,7 @@ static bool doInterpolationForCurrentGroup(SSqlObj *pSql) {
int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pInterpoInfo->numOfRawDataInRows - 1)); int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pInterpoInfo->numOfRawDataInRows - 1));
int32_t remain = taosNumOfRemainPoints(pInterpoInfo); int32_t remain = taosNumOfRemainPoints(pInterpoInfo);
TSKEY ekey = taosGetRevisedEndKey(etime, pCmd->order.order, pCmd->nAggTimeInterval, pCmd->intervalTimeUnit); TSKEY ekey = taosGetRevisedEndKey(etime, pCmd->order.order, pCmd->nAggTimeInterval, pCmd->intervalTimeUnit, p);
int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pLocalReducer->pBufForInterpo, remain, int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pLocalReducer->pBufForInterpo, remain,
pCmd->nAggTimeInterval, ekey, pLocalReducer->resColModel->maxCapacity); pCmd->nAggTimeInterval, ekey, pLocalReducer->resColModel->maxCapacity);
if (rows > 0) { // do interpo if (rows > 0) { // do interpo
...@@ -1164,6 +1170,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { ...@@ -1164,6 +1170,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo; SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo;
bool prevGroupCompleted = (!pLocalReducer->discard) && pLocalReducer->hasUnprocessedRow; bool prevGroupCompleted = (!pLocalReducer->discard) && pLocalReducer->hasUnprocessedRow;
int8_t precision = pCmd->pMeterMeta->precision;
if ((isAllSourcesCompleted(pLocalReducer) && !pLocalReducer->hasPrevRow) || pLocalReducer->pLocalDataSrc[0] == NULL || if ((isAllSourcesCompleted(pLocalReducer) && !pLocalReducer->hasPrevRow) || pLocalReducer->pLocalDataSrc[0] == NULL ||
prevGroupCompleted) { prevGroupCompleted) {
...@@ -1171,7 +1178,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { ...@@ -1171,7 +1178,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
if (pCmd->interpoType != TSDB_INTERPO_NONE) { if (pCmd->interpoType != TSDB_INTERPO_NONE) {
int64_t etime = (pCmd->stime < pCmd->etime) ? pCmd->etime : pCmd->stime; int64_t etime = (pCmd->stime < pCmd->etime) ? pCmd->etime : pCmd->stime;
etime = taosGetRevisedEndKey(etime, pCmd->order.order, pCmd->nAggTimeInterval, pCmd->intervalTimeUnit); etime = taosGetRevisedEndKey(etime, pCmd->order.order, pCmd->nAggTimeInterval, pCmd->intervalTimeUnit, precision);
int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, NULL, 0, pCmd->nAggTimeInterval, etime, int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, NULL, 0, pCmd->nAggTimeInterval, etime,
pLocalReducer->resColModel->maxCapacity); pLocalReducer->resColModel->maxCapacity);
if (rows > 0) { // do interpo if (rows > 0) { // do interpo
......
...@@ -40,14 +40,14 @@ typedef struct SPoint { ...@@ -40,14 +40,14 @@ typedef struct SPoint {
typedef void (*__interpo_callback_fn_t)(void *param); typedef void (*__interpo_callback_fn_t)(void *param);
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char intervalTimeUnit); int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char intervalTimeUnit, int16_t precision);
void taosInitInterpoInfo(SInterpolationInfo *pInterpoInfo, int32_t order, int64_t startTimeStamp, int32_t numOfTags, void taosInitInterpoInfo(SInterpolationInfo *pInterpoInfo, int32_t order, int64_t startTimeStamp, int32_t numOfTags,
int32_t rowSize); int32_t rowSize);
void taosInterpoSetStartInfo(SInterpolationInfo *pInterpoInfo, int32_t numOfRawDataInRows, int32_t type); void taosInterpoSetStartInfo(SInterpolationInfo *pInterpoInfo, int32_t numOfRawDataInRows, int32_t type);
TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int32_t timeInterval, int8_t intervalTimeUnit); TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int32_t timeInterval, int8_t intervalTimeUnit, int8_t precision);
/** /**
* *
......
...@@ -60,8 +60,13 @@ ...@@ -60,8 +60,13 @@
#define __sync_val_compare_and_swap_64 __sync_val_compare_and_swap #define __sync_val_compare_and_swap_64 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap #define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap
#define __sync_add_and_fetch_64 __sync_add_and_fetch #define __sync_add_and_fetch_64 __sync_add_and_fetch
#define __sync_add_and_fetch_32 __sync_add_and_fetch #define __sync_add_and_fetch_32 __sync_add_and_fetch
#define __sync_sub_and_fetch_64 __sync_sub_and_fetch
#define __sync_sub_and_fetch_32 __sync_sub_and_fetch
int32_t __sync_val_load_32(int32_t *ptr); int32_t __sync_val_load_32(int32_t *ptr);
void __sync_val_restore_32(int32_t *ptr, int32_t newval); void __sync_val_restore_32(int32_t *ptr, int32_t newval);
......
...@@ -278,14 +278,14 @@ typedef struct { ...@@ -278,14 +278,14 @@ typedef struct {
TSKEY ekey; TSKEY ekey;
int64_t nAggTimeInterval; int64_t nAggTimeInterval;
char intervalTimeUnit; // interval data type, used for daytime revise char intervalTimeUnit; // interval data type, used for daytime revise
char precision;
int16_t numOfOutputCols; int16_t numOfOutputCols;
int16_t interpoType; int16_t interpoType;
int16_t checkBufferInLoop; // check if the buffer is full during scan each block int16_t checkBufferInLoop; // check if the buffer is full during scan each block
SLimitVal limit; SLimitVal limit;
int32_t rowSize; int32_t rowSize;
int32_t dataRowSize; // row size of each loaded data from disk, the value is int32_t dataRowSize; // row size of each loaded data from disk, the value is
// used for prepare buffer // used for prepare buffer
SSqlGroupbyExpr * pGroupbyExpr; SSqlGroupbyExpr * pGroupbyExpr;
SSqlFunctionExpr *pSelectExpr; SSqlFunctionExpr *pSelectExpr;
......
...@@ -810,14 +810,14 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQuery *pQuery, int32_t slo ...@@ -810,14 +810,14 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQuery *pQuery, int32_t slo
SCacheBlock *pBlock = pCacheInfo->cacheBlocks[slot]; SCacheBlock *pBlock = pCacheInfo->cacheBlocks[slot];
if (pBlock == NULL) { if (pBlock == NULL) {
dError("QInfo:%p NULL Block In Cache, available block:%d, last block:%d, accessed null block:%d, pBlockId:%d", dError("QInfo:%p NULL Block In Cache, available block:%d, last block:%d, accessed null block:%d, pBlockId:%d",
GET_QINFO_ADDR(pQuery), pCacheInfo->numOfBlocks, pCacheInfo->currentSlot, slot, pQuery->blockId); GET_QINFO_ADDR(pQuery), pCacheInfo->numOfBlocks, pCacheInfo->currentSlot, slot, pQuery->blockId);
return NULL; return NULL;
} }
if (pMeterObj != pBlock->pMeterObj || pBlock->blockId > pQuery->blockId || pBlock->numOfPoints <= 0) { if (pMeterObj != pBlock->pMeterObj || pBlock->blockId > pQuery->blockId) {
dWarn("QInfo:%p vid:%d sid:%d id:%s, cache block is overwritten, slot:%d blockId:%d qBlockId:%d", dWarn("QInfo:%p vid:%d sid:%d id:%s, cache block is overwritten, slot:%d blockId:%d qBlockId:%d, meterObj:%p, blockMeterObj:%p",
GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pBlock->blockId, GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pBlock->blockId,
pQuery->blockId); pQuery->blockId, pMeterObj, pBlock->pMeterObj);
return NULL; return NULL;
} }
...@@ -1105,7 +1105,8 @@ static int32_t applyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t forwardS ...@@ -1105,7 +1105,8 @@ static int32_t applyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t forwardS
TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->skey : pQuery->ekey; TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->skey : pQuery->ekey;
int64_t alignedTimestamp = taosGetIntervalStartTimestamp(ts, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit); int64_t alignedTimestamp = taosGetIntervalStartTimestamp(ts, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit,
pQuery->precision);
setExecParams(pQuery, &pCtx[k], alignedTimestamp, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, setExecParams(pQuery, &pCtx[k], alignedTimestamp, dataBlock, (char *)primaryKeyCol, forwardStep, functionId,
tpField, hasNull, pRuntimeEnv->blockStatus, &sas, pRuntimeEnv->scanFlag); tpField, hasNull, pRuntimeEnv->blockStatus, &sas, pRuntimeEnv->scanFlag);
...@@ -1190,7 +1191,8 @@ static int32_t applyAllFunctions_Filter(SQueryRuntimeEnv *pRuntimeEnv, int32_t * ...@@ -1190,7 +1191,8 @@ static int32_t applyAllFunctions_Filter(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
char *dataBlock = getDataBlocks(pRuntimeEnv, data, &sasArray[k], k, isDiskFileBlock); char *dataBlock = getDataBlocks(pRuntimeEnv, data, &sasArray[k], k, isDiskFileBlock);
TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->skey : pQuery->ekey; TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->skey : pQuery->ekey;
int64_t alignedTimestamp = taosGetIntervalStartTimestamp(ts, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit); int64_t alignedTimestamp = taosGetIntervalStartTimestamp(ts, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit,
pQuery->precision);
setExecParams(pQuery, &pCtx[k], alignedTimestamp, dataBlock, (char *)primaryKeyCol, (*forwardStep), functionId, setExecParams(pQuery, &pCtx[k], alignedTimestamp, dataBlock, (char *)primaryKeyCol, (*forwardStep), functionId,
pFields, hasNull, pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag); pFields, hasNull, pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag);
...@@ -2096,7 +2098,8 @@ static void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t qualified ...@@ -2096,7 +2098,8 @@ static void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t qualified
return; return;
} }
*skey = taosGetIntervalStartTimestamp(qualifiedKey, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit); *skey = taosGetIntervalStartTimestamp(qualifiedKey, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit,
pQuery->precision);
int64_t endKey = *skey + pQuery->nAggTimeInterval - 1; int64_t endKey = *skey + pQuery->nAggTimeInterval - 1;
if (*skey < keyFirst) { if (*skey < keyFirst) {
...@@ -3276,7 +3279,8 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete ...@@ -3276,7 +3279,8 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int64_t rs = taosGetIntervalStartTimestamp(pSupporter->rawSKey, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit); int64_t rs = taosGetIntervalStartTimestamp(pSupporter->rawSKey, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit,
pQuery->precision);
taosInitInterpoInfo(&pSupporter->runtimeEnv.interpoInfo, pQuery->order.order, rs, 0, 0); taosInitInterpoInfo(&pSupporter->runtimeEnv.interpoInfo, pQuery->order.order, rs, 0, 0);
allocMemForInterpo(pSupporter, pQuery, pMeterObj); allocMemForInterpo(pSupporter, pQuery, pMeterObj);
...@@ -3431,8 +3435,8 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery) { ...@@ -3431,8 +3435,8 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery) {
pQuery->interpoType = TSDB_INTERPO_NONE; pQuery->interpoType = TSDB_INTERPO_NONE;
} }
TSKEY revisedStime = TSKEY revisedStime = taosGetIntervalStartTimestamp(pSupporter->rawSKey, pQuery->nAggTimeInterval,
taosGetIntervalStartTimestamp(pSupporter->rawSKey, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit); pQuery->intervalTimeUnit, pQuery->precision);
taosInitInterpoInfo(&pSupporter->runtimeEnv.interpoInfo, pQuery->order.order, revisedStime, 0, 0); taosInitInterpoInfo(&pSupporter->runtimeEnv.interpoInfo, pQuery->order.order, revisedStime, 0, 0);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -6117,7 +6121,7 @@ bool vnodeHasRemainResults(void *handle) { ...@@ -6117,7 +6121,7 @@ bool vnodeHasRemainResults(void *handle) {
if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) {
/* query has completed */ /* query has completed */
TSKEY ekey = taosGetRevisedEndKey(pSupporter->rawEKey, pQuery->order.order, pQuery->nAggTimeInterval, TSKEY ekey = taosGetRevisedEndKey(pSupporter->rawEKey, pQuery->order.order, pQuery->nAggTimeInterval,
pQuery->intervalTimeUnit); pQuery->intervalTimeUnit, pQuery->precision);
int32_t numOfTotal = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pRuntimeEnv->pInterpoBuf[0]->data, int32_t numOfTotal = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pRuntimeEnv->pInterpoBuf[0]->data,
remain, pQuery->nAggTimeInterval, ekey, pQuery->pointsToRead); remain, pQuery->nAggTimeInterval, ekey, pQuery->pointsToRead);
return numOfTotal > 0; return numOfTotal > 0;
...@@ -6201,7 +6205,7 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage ...@@ -6201,7 +6205,7 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage
numOfRows = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo); numOfRows = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo);
TSKEY ekey = taosGetRevisedEndKey(pSupporter->rawEKey, pQuery->order.order, pQuery->nAggTimeInterval, TSKEY ekey = taosGetRevisedEndKey(pSupporter->rawEKey, pQuery->order.order, pQuery->nAggTimeInterval,
pQuery->intervalTimeUnit); pQuery->intervalTimeUnit, pQuery->precision);
int32_t numOfFinalRows = int32_t numOfFinalRows =
taosGetNumOfResultWithInterpo(&pRuntimeEnv->interpoInfo, (TSKEY *)pDataSrc[0]->data, numOfRows, taosGetNumOfResultWithInterpo(&pRuntimeEnv->interpoInfo, (TSKEY *)pDataSrc[0]->data, numOfRows,
pQuery->nAggTimeInterval, ekey, pQuery->pointsToRead); pQuery->nAggTimeInterval, ekey, pQuery->pointsToRead);
......
...@@ -27,6 +27,12 @@ ...@@ -27,6 +27,12 @@
#include "vnodeQueryImpl.h" #include "vnodeQueryImpl.h"
#define ALL_CACHE_BLOCKS_CHECKED(q) \
((q)->slot == (q)->currentSlot && QUERY_IS_ASC_QUERY(q) || (q)->slot == (q)->firstSlot && (!QUERY_IS_ASC_QUERY(q)))
#define FORWARD_CACHE_BLOCK_CHECK_SLOT(slot, step, maxblocks) (slot) = ((slot) + (step) + (maxblocks)) % (maxblocks);
static bool doCheckWithPrevQueryRange(SQInfo *pQInfo, TSKEY nextKey, SMeterDataInfo *pMeterInfo) { static bool doCheckWithPrevQueryRange(SQInfo *pQInfo, TSKEY nextKey, SMeterDataInfo *pMeterInfo) {
SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter;
SQuery * pQuery = &pQInfo->query; SQuery * pQuery = &pQInfo->query;
...@@ -49,6 +55,22 @@ static bool doCheckWithPrevQueryRange(SQInfo *pQInfo, TSKEY nextKey, SMeterDataI ...@@ -49,6 +55,22 @@ static bool doCheckWithPrevQueryRange(SQInfo *pQInfo, TSKEY nextKey, SMeterDataI
return true; return true;
} }
/**
* The start position of the first check cache block is located before starting the loop.
* And the start position for next cache blocks needs to be decided before checking each cache block.
*/
static void setStartPositionForCacheBlock(SQuery *pQuery, SCacheBlock *pBlock, bool *firstCheckSlot) {
if (!(*firstCheckSlot)) {
if (QUERY_IS_ASC_QUERY(pQuery)) {
pQuery->pos = 0;
} else {
pQuery->pos = pBlock->numOfPoints - 1;
}
} else {
(*firstCheckSlot) = false;
}
}
static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) { static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) {
SQuery * pQuery = &pQInfo->query; SQuery * pQuery = &pQInfo->query;
SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter;
...@@ -147,24 +169,39 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe ...@@ -147,24 +169,39 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe
continue; continue;
} }
bool firstCheckSlot = true;
SCacheInfo *pCacheInfo = (SCacheInfo *)pMeterObj->pCache; SCacheInfo *pCacheInfo = (SCacheInfo *)pMeterObj->pCache;
for (int32_t i = 0; i < pCacheInfo->maxBlocks; ++i) { for (int32_t i = 0; i < pCacheInfo->maxBlocks; ++i) {
pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot); pBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot);
// cache block may be flushed to disk, so it is not available, ignore it and try next /*
if (pBlock == NULL) { * 1. pBlock == NULL. The cache block may be flushed to disk, so it is not available, skip and try next
pQuery->slot = (pQuery->slot + step + pCacheInfo->maxBlocks) % pCacheInfo->maxBlocks; *
* 2. pBlock->numOfPoints == 0. There is a empty block, which is caused by allocate-and-write data into cache
* procedure. The block has been allocated but data has not been put into yet. If the block is the last
* block(newly allocated block), abort query. Otherwise, skip it and go on.
*/
if ((pBlock == NULL) || (pBlock->numOfPoints == 0)) {
if (ALL_CACHE_BLOCKS_CHECKED(pQuery)) {
break;
}
FORWARD_CACHE_BLOCK_CHECK_SLOT(pQuery->slot, step, pCacheInfo->maxBlocks);
continue; continue;
} }
setStartPositionForCacheBlock(pQuery, pBlock, &firstCheckSlot);
TSKEY *primaryKeys = (TSKEY *)pBlock->offset[0]; TSKEY *primaryKeys = (TSKEY *)pBlock->offset[0];
// in handling file data block, this query condition is checked during fetching candidate file blocks
// in handling file data block, the timestamp range validation is done during fetching candidate file blocks
if ((primaryKeys[pQuery->pos] > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) || if ((primaryKeys[pQuery->pos] > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) ||
(primaryKeys[pQuery->pos] < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery))) { (primaryKeys[pQuery->pos] < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery))) {
break; break;
} }
/* only record the key on last block */ // only record the key on last block
SET_CACHE_BLOCK_FLAG(pRuntimeEnv->blockStatus); SET_CACHE_BLOCK_FLAG(pRuntimeEnv->blockStatus);
SBlockInfo binfo = getBlockBasicInfo(pBlock, BLK_CACHE_BLOCK); SBlockInfo binfo = getBlockBasicInfo(pBlock, BLK_CACHE_BLOCK);
...@@ -176,24 +213,11 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe ...@@ -176,24 +213,11 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe
queryOnBlock(pSupporter, primaryKeys, pRuntimeEnv->blockStatus, (char *)pBlock, &binfo, &pMeterInfo[k], NULL, queryOnBlock(pSupporter, primaryKeys, pRuntimeEnv->blockStatus, (char *)pBlock, &binfo, &pMeterInfo[k], NULL,
searchFn); searchFn);
// todo refactor if (ALL_CACHE_BLOCKS_CHECKED(pQuery)) {
if ((pQuery->slot == pQuery->currentSlot && QUERY_IS_ASC_QUERY(pQuery)) ||
(pQuery->slot == pQuery->firstSlot && !QUERY_IS_ASC_QUERY(pQuery))) {
break; break;
} }
// try next cache block FORWARD_CACHE_BLOCK_CHECK_SLOT(pQuery->slot, step, pCacheInfo->maxBlocks);
pQuery->slot = (pQuery->slot + step + pCacheInfo->maxBlocks) % pCacheInfo->maxBlocks;
if (QUERY_IS_ASC_QUERY(pQuery)) {
pQuery->pos = 0;
} else { // backwards traverse encounter the cache invalid, abort scan cache.
SCacheBlock *pNextBlock = getCacheDataBlock(pMeterObj, pQuery, pQuery->slot);
if (pNextBlock == NULL) {
break; // todo fix
} else {
pQuery->pos = pNextBlock->numOfPoints - 1;
}
}
} }
} }
} }
......
...@@ -222,6 +222,8 @@ static SQInfo *vnodeAllocateQInfoCommon(SQueryMeterMsg *pQueryMsg, SMeterObj *pM ...@@ -222,6 +222,8 @@ static SQInfo *vnodeAllocateQInfoCommon(SQueryMeterMsg *pQueryMsg, SMeterObj *pM
} }
vnodeUpdateFilterColumnIndex(pQuery); vnodeUpdateFilterColumnIndex(pQuery);
pQuery->precision = vnodeList[pMeterObj->vnode].cfg.precision;
return pQInfo; return pQInfo;
_clean_memory: _clean_memory:
......
...@@ -614,7 +614,7 @@ static FORCE_INLINE void taosDecRef(SDataNode *pNode) { ...@@ -614,7 +614,7 @@ static FORCE_INLINE void taosDecRef(SDataNode *pNode) {
} }
if (pNode->refCount > 0) { if (pNode->refCount > 0) {
__sync_add_and_fetch_32(&pNode->refCount, -1); __sync_sub_and_fetch_32(&pNode->refCount, 1);
pTrace("key:%s is released by app.refcnt:%d", pNode->key, pNode->refCount); pTrace("key:%s is released by app.refcnt:%d", pNode->key, pNode->refCount);
} else { } else {
/* /*
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
#define INTERPOL_IS_ASC_INTERPOL(interp) ((interp)->order == TSQL_SO_ASC) #define INTERPOL_IS_ASC_INTERPOL(interp) ((interp)->order == TSQL_SO_ASC)
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char intervalTimeUnit) { int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char intervalTimeUnit, int16_t precision) {
if (timeRange == 0) { if (timeRange == 0) {
return startTime; return startTime;
} }
...@@ -39,16 +39,17 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char ...@@ -39,16 +39,17 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char
* *
* TODO dynmaically decide the start time of a day * TODO dynmaically decide the start time of a day
*/ */
#ifdef _MSC_VER
#if _MSC_VER >= 1900 #if defined(WINDOWS) && _MSC_VER >= 1900
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019 // see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
int64_t timezone = _timezone; int64_t timezone = _timezone;
int32_t daylight = _daylight; int32_t daylight = _daylight;
char** tzname = _tzname; char** tzname = _tzname;
#endif #endif
#endif
int64_t revStartime = (startTime / timeRange) * timeRange + timezone * MILLISECOND_PER_SECOND; int64_t t = (precision == TSDB_TIME_PRECISION_MILLI)?MILLISECOND_PER_SECOND:MILLISECOND_PER_SECOND*1000L;
int64_t revStartime = (startTime / timeRange) * timeRange + timezone * t;
int64_t revEndtime = revStartime + timeRange - 1; int64_t revEndtime = revStartime + timeRange - 1;
if (revEndtime < startTime) { if (revEndtime < startTime) {
revStartime += timeRange; revStartime += timeRange;
...@@ -85,11 +86,11 @@ void taosInterpoSetStartInfo(SInterpolationInfo* pInterpoInfo, int32_t numOfRawD ...@@ -85,11 +86,11 @@ void taosInterpoSetStartInfo(SInterpolationInfo* pInterpoInfo, int32_t numOfRawD
pInterpoInfo->numOfRawDataInRows = numOfRawDataInRows; pInterpoInfo->numOfRawDataInRows = numOfRawDataInRows;
} }
TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int32_t timeInterval, int8_t intervalTimeUnit) { TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int32_t timeInterval, int8_t intervalTimeUnit, int8_t precision) {
if (order == TSQL_SO_ASC) { if (order == TSQL_SO_ASC) {
return ekey; return ekey;
} else { } else {
return taosGetIntervalStartTimestamp(ekey, timeInterval, intervalTimeUnit); return taosGetIntervalStartTimestamp(ekey, timeInterval, intervalTimeUnit, precision);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册