diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 47460a5fab34289e5534fc753ff4b4bd86452d65..e35e805efefb7c017dd5c0c828023ecbe6e0c460 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -155,7 +155,7 @@ extern char tsMnodeTmpDir[]; extern char tsDataDir[]; extern char tsLogDir[]; extern char tsScriptDir[]; -extern int64_t tsMsPerDay[3]; +extern int64_t tsTickPerDay[3]; // system info extern char tsOsName[]; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index af21ef6d82f96de6c2d0349911b3176f14510ee2..51d44add4c1672b1251be50941b49e5f87b3c019 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -204,7 +204,7 @@ SDiskCfg tsDiskCfg[TSDB_MAX_DISKS]; * TSDB_TIME_PRECISION_MICRO: 86400000000L * TSDB_TIME_PRECISION_NANO: 86400000000000L */ -int64_t tsMsPerDay[] = {86400000L, 86400000000L, 86400000000000L}; +int64_t tsTickPerDay[] = {86400000L, 86400000000L, 86400000000000L}; // system info char tsOsName[10] = "Linux"; diff --git a/src/tsdb/inc/tsdbFile.h b/src/tsdb/inc/tsdbFile.h index dcb5eadfab6c909ddd169fb3372799f7088c1903..b9d5431de6bc3864a4a13ea30356033de76da178 100644 --- a/src/tsdb/inc/tsdbFile.h +++ b/src/tsdb/inc/tsdbFile.h @@ -350,8 +350,8 @@ static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet* pSrc, SDFileSet* pDest) { } static FORCE_INLINE void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY* minKey, TSKEY* maxKey) { - *minKey = fid * days * tsMsPerDay[precision]; - *maxKey = *minKey + days * tsMsPerDay[precision] - 1; + *minKey = fid * days * tsTickPerDay[precision]; + *maxKey = *minKey + days * tsTickPerDay[precision] - 1; } static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) { diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 82cc6f07f77300aadd554a7c22c0cf77308b3e53..75b072b063772f898753ac46ef8f05ccb490007c 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -17,9 +17,9 @@ #define TSDB_MAX_SUBBLOCKS 8 static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) { if (key < 0) { - return (int)((key + 1) / tsMsPerDay[precision] / days - 1); + return (int)((key + 1) / tsTickPerDay[precision] / days - 1); } else { - return (int)((key / tsMsPerDay[precision] / days)); + return (int)((key / tsTickPerDay[precision] / days)); } } @@ -363,9 +363,9 @@ void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) { TSKEY minKey, midKey, maxKey, now; now = taosGetTimestamp(pCfg->precision); - minKey = now - pCfg->keep * tsMsPerDay[pCfg->precision]; - midKey = now - pCfg->keep2 * tsMsPerDay[pCfg->precision]; - maxKey = now - pCfg->keep1 * tsMsPerDay[pCfg->precision]; + minKey = now - pCfg->keep * tsTickPerDay[pCfg->precision]; + midKey = now - pCfg->keep2 * tsTickPerDay[pCfg->precision]; + maxKey = now - pCfg->keep1 * tsTickPerDay[pCfg->precision]; pRtn->minKey = minKey; pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision)); diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 9d8b1ca7f2889f40b696f04a608dd166adf6eac6..50b5d321773f6d9d51d29759eb16a9b07bc7e1d8 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -632,8 +632,8 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) { SSubmitBlkIter blkIter = {0}; SDataRow row = NULL; TSKEY now = taosGetTimestamp(pRepo->config.precision); - TSKEY minKey = now - tsMsPerDay[pRepo->config.precision] * pRepo->config.keep; - TSKEY maxKey = now + tsMsPerDay[pRepo->config.precision] * pRepo->config.daysPerFile; + TSKEY minKey = now - tsTickPerDay[pRepo->config.precision] * pRepo->config.keep; + TSKEY maxKey = now + tsTickPerDay[pRepo->config.precision] * pRepo->config.daysPerFile; terrno = TSDB_CODE_SUCCESS; pMsg->length = htonl(pMsg->length); diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 92a0d489b3b28820a20706318883bb7b6a280820..cb50eaf896972d0d80a719db00036ead5cb0b616 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -280,9 +280,17 @@ static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STa info.tableId.uid = info.pTableObj->tableId.uid; if (ASCENDING_TRAVERSE(pQueryHandle->order)) { - assert(info.lastKey >= pQueryHandle->window.skey); + if (info.lastKey == INT64_MIN) { + info.lastKey = pQueryHandle->window.skey; + } + + assert(info.lastKey >= pQueryHandle->window.skey && info.lastKey <= pQueryHandle->window.ekey); } else { - assert(info.lastKey <= pQueryHandle->window.skey); + if (info.lastKey == INT64_MIN) { + info.lastKey = pQueryHandle->window.ekey; + } + + assert(info.lastKey >= pQueryHandle->window.ekey && info.lastKey <= pQueryHandle->window.skey); } taosArrayPush(pTableCheckInfo, &info); @@ -339,14 +347,44 @@ static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY s return pNew; } +// Update the query time window according to the data time to live(TTL) information, in order to avoid to return +// the expired data to client, even it is queried already. +static int64_t getEarliestValidTimestamp(STsdbRepo* pTsdb) { + STsdbCfg* pCfg = &pTsdb->config; + + int64_t now = taosGetTimestamp(pCfg->precision); + return now - (tsTickPerDay[pCfg->precision] * pCfg->keep); +} + +static void setQueryTimewindow(STsdbQueryHandle* pQueryHandle, STsdbQueryCond* pCond) { + pQueryHandle->window = pCond->twindow; + + int64_t startTs = getEarliestValidTimestamp(pQueryHandle->pTsdb); + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { + if (startTs > pQueryHandle->window.skey) { + pQueryHandle->window.skey = startTs; + } + + assert(pQueryHandle->window.skey <= pQueryHandle->window.ekey); + } else { + if (startTs > pQueryHandle->window.ekey) { + pQueryHandle->window.ekey = startTs; + } + + assert(pQueryHandle->window.skey >= pQueryHandle->window.ekey); + } + + tsdbDebug("%p update the query time window, old:%"PRId64" - %"PRId64", new:%"PRId64" - %"PRId64 ", 0x%"PRIx64, + pQueryHandle, pCond->twindow.skey, pCond->twindow.ekey, pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qId); +} + static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pCond, uint64_t qId, SMemRef* pMemRef) { STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); if (pQueryHandle == NULL) { - goto out_of_memory; + goto _end; } pQueryHandle->order = pCond->order; - pQueryHandle->window = pCond->twindow; pQueryHandle->pTsdb = tsdb; pQueryHandle->type = TSDB_QUERY_TYPE_ALL; pQueryHandle->cur.fid = INT32_MIN; @@ -354,36 +392,33 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC pQueryHandle->checkFiles = true; pQueryHandle->activeIndex = 0; // current active table index pQueryHandle->qId = qId; - pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock; pQueryHandle->allocSize = 0; pQueryHandle->locateStart = false; pQueryHandle->pMemRef = pMemRef; + pQueryHandle->loadType = pCond->type; + + pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock; pQueryHandle->loadExternalRow = pCond->loadExternalRows; pQueryHandle->currentLoadExternalRows = pCond->loadExternalRows; - pQueryHandle->loadType = pCond->type; - if (tsdbInitReadH(&pQueryHandle->rhelper, (STsdbRepo*)tsdb) != 0) { - goto out_of_memory; + goto _end; } assert(pCond != NULL && pMemRef != NULL); - if (ASCENDING_TRAVERSE(pCond->order)) { - assert(pQueryHandle->window.skey <= pQueryHandle->window.ekey); - } else { - assert(pQueryHandle->window.skey >= pQueryHandle->window.ekey); - } + setQueryTimewindow(pQueryHandle, pCond); + if (pCond->numOfCols > 0) { // allocate buffer in order to load data blocks from file pQueryHandle->statis = calloc(pCond->numOfCols, sizeof(SDataStatis)); if (pQueryHandle->statis == NULL) { - goto out_of_memory; + goto _end; } - pQueryHandle->pColumns = - taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array? + // todo: use list instead of array? + pQueryHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData)); if (pQueryHandle->pColumns == NULL) { - goto out_of_memory; + goto _end; } for (int32_t i = 0; i < pCond->numOfCols; ++i) { @@ -392,14 +427,16 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC colInfo.info = pCond->colList[i]; colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes); if (colInfo.pData == NULL) { - goto out_of_memory; + goto _end; } + taosArrayPush(pQueryHandle->pColumns, &colInfo); pQueryHandle->statis[i].colId = colInfo.info.colId; } pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true); } + STsdbMeta* pMeta = tsdbGetMeta(tsdb); assert(pMeta != NULL); @@ -407,7 +444,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC if (pQueryHandle->pDataCols == NULL) { tsdbError("%p failed to malloc buf for pDataCols, %"PRIu64, pQueryHandle, pQueryHandle->qId); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto out_of_memory; + goto _end; } tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo); @@ -415,7 +452,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC return (TsdbQueryHandleT) pQueryHandle; - out_of_memory: + _end: tsdbCleanupQueryHandle(pQueryHandle); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return NULL; @@ -864,10 +901,10 @@ static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precisio } if (key < 0) { - key -= (daysPerFile * tsMsPerDay[precision]); + key -= (daysPerFile * tsTickPerDay[precision]); } - int64_t fid = (int64_t)(key / (daysPerFile * tsMsPerDay[precision])); // set the starting fileId + int64_t fid = (int64_t)(key / (daysPerFile * tsTickPerDay[precision])); // set the starting fileId if (fid < 0L && llabs(fid) > INT32_MAX) { // data value overflow for INT32 fid = INT32_MIN; } @@ -3548,7 +3585,6 @@ int32_t tsdbGetOneTableGroup(STsdbRepo* tsdb, uint64_t uid, TSKEY startKey, STab taosArrayPush(group, &info); taosArrayPush(pGroupInfo->pGroupList, &group); - return TSDB_CODE_SUCCESS; _error: