diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index 95d559b4fa2a849a68ba85c91e72d9f3b8efa3a5..b159ffc5a16a408ccde5c9e3cf9a0eb00c1c1a05 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -324,7 +324,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd tfree(pReducer->discardData); tfree(pReducer->pResultBuf); tfree(pReducer->pFinalRes); -// tfree(pReducer->pBufForInterpo); tfree(pReducer->prevRowOfInput); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -363,7 +362,8 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd if (pQueryInfo->fillType != TSDB_FILL_NONE) { SFillColInfo* pFillCol = createFillColInfo(pQueryInfo); pReducer->pFillInfo = taosInitFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, - 4096, numOfCols, pQueryInfo->slidingTime, pQueryInfo->fillType, pFillCol); + 4096, numOfCols, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit, + tinfo.precision, pQueryInfo->fillType, pFillCol); } int32_t startIndex = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols; @@ -494,7 +494,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { tscTrace("%p waiting for delete procedure, status: %d", pSql, status); } - taosDestoryFillInfo(pLocalReducer->pFillInfo); + pLocalReducer->pFillInfo = taosDestoryFillInfo(pLocalReducer->pFillInfo); if (pLocalReducer->pCtx != NULL) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { @@ -980,8 +980,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO } /* all output for current group are completed */ - int32_t totalRemainRows = - taosGetNumOfResultWithFill(pFillInfo, rpoints, pFillInfo->slidingTime, actualETime); + int32_t totalRemainRows = getFilledNumOfRes(pFillInfo, actualETime, pLocalReducer->resColModel->capacity); if (totalRemainRows <= 0) { break; } @@ -1267,13 +1266,7 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no SFillInfo* pFillInfo = pLocalReducer->pFillInfo; if (pFillInfo != NULL) { - STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); - STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - - TSKEY ekey = taosGetRevisedEndKey(pQueryInfo->window.ekey, pFillInfo->order, pFillInfo->slidingTime, - pQueryInfo->slidingTimeUnit, tinfo.precision); - - taosFillSetStartInfo(pFillInfo, pResBuf->num, ekey); + taosFillSetStartInfo(pFillInfo, pResBuf->num, pQueryInfo->window.ekey); taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf); } @@ -1327,23 +1320,15 @@ static bool doBuildFilledResultForGroup(SSqlObj *pSql) { SLocalReducer *pLocalReducer = pRes->pLocalReducer; SFillInfo *pFillInfo = pLocalReducer->pFillInfo; - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - - int8_t p = tinfo.precision; - if (pFillInfo != NULL && taosNumOfRemainRows(pFillInfo) > 0) { assert(pQueryInfo->fillType != TSDB_FILL_NONE); tFilePage *pFinalDataBuf = pLocalReducer->pResultBuf; - int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pFillInfo->numOfRows - 1)); + int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pFillInfo->numOfRows - 1)); - int32_t remain = taosNumOfRemainRows(pFillInfo); - TSKEY ekey = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit, p); - // the first column must be the timestamp column - int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, ekey, pLocalReducer->resColModel->capacity); - if (rows > 0) { // do interpo + int32_t rows = getFilledNumOfRes(pFillInfo, etime, pLocalReducer->resColModel->capacity); + if (rows > 0) { // do fill gap doFillResult(pSql, pLocalReducer, false); } @@ -1362,10 +1347,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { bool prevGroupCompleted = (!pLocalReducer->discard) && pLocalReducer->hasUnprocessedRow; - SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); if ((isAllSourcesCompleted(pLocalReducer) && !pLocalReducer->hasPrevRow) || pLocalReducer->pLocalDataSrc[0] == NULL || prevGroupCompleted) { @@ -1373,9 +1355,8 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { if (pQueryInfo->fillType != TSDB_FILL_NONE) { int64_t etime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey; - etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, - pQueryInfo->slidingTimeUnit, tinfo.precision); - int32_t rows = taosGetNumOfResultWithFill(pFillInfo, 0, etime, pLocalReducer->resColModel->capacity); + assert(pFillInfo->numOfRows == 0); + int32_t rows = getFilledNumOfRes(pFillInfo, etime, pLocalReducer->resColModel->capacity); if (rows > 0) { // do interpo doFillResult(pSql, pLocalReducer, true); } diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 66fbf9fc9701d4bc33c0dd4a11b17faa53d361bc..bcd01a322e84f7d596a3f7715b687cba812c8410 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -144,11 +144,11 @@ void taos_init_imp() { } int64_t refreshTime = tsTableMetaKeepTimer; - refreshTime = refreshTime > 2 ? 2 : refreshTime; - refreshTime = refreshTime < 1 ? 1 : refreshTime; + refreshTime = refreshTime > 10 ? 10 : refreshTime; + refreshTime = refreshTime < 10 ? 10 : refreshTime; if (tscCacheHandle == NULL) { - tscCacheHandle = taosCacheInit(tscTmr, refreshTime); + tscCacheHandle = taosCacheInit(refreshTime); } tscTrace("client is initialized successfully"); diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 0e4ffda78bf7d9d9e50101fece7d78b2ef5dabdf..c01141f0d68181713455ab2f58017ec6e112f8b4 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -273,8 +273,7 @@ void dataColSetNullAt(SDataCol *pCol, int index) { if (IS_VAR_DATA_TYPE(pCol->type)) { pCol->dataOff[index] = pCol->len; char *ptr = POINTER_SHIFT(pCol->pData, pCol->len); - varDataLen(ptr) = (pCol->type == TSDB_DATA_TYPE_BINARY) ? sizeof(char) : TSDB_NCHAR_SIZE; - setNull(varDataVal(ptr), pCol->type, pCol->bytes); + setVardataNull(ptr, pCol->type); pCol->len += varDataTLen(ptr); } else { setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes); diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 8fb894a588ba6926cc07a6a2444f6d334d883770..bfedff0d941ec490dac30f96e57fbb1061cdcc4e 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -710,7 +710,7 @@ void *readTable(void *sarg) { int32_t code = taos_errno(pSql); if (code != 0) { - fprintf(stderr, "Failed to query:%s\n", taos_errstr(taos)); + fprintf(stderr, "Failed to query:%s\n", taos_errstr(pSql)); taos_free_result(pSql); taos_close(taos); exit(EXIT_FAILURE); @@ -779,7 +779,7 @@ void *readMetric(void *sarg) { int32_t code = taos_errno(pSql); if (code != 0) { - fprintf(stderr, "Failed to query:%s\n", taos_errstr(taos)); + fprintf(stderr, "Failed to query:%s\n", taos_errstr(pSql)); taos_free_result(pSql); taos_close(taos); exit(1); @@ -818,7 +818,9 @@ void queryDB(TAOS *taos, char *command) { } if (i == 0) { - fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(taos)); + fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(pSql)); + taos_free_result(pSql); + taos_close(taos); exit(EXIT_FAILURE); } @@ -914,7 +916,7 @@ void callBack(void *param, TAOS_RES *res, int code) { int64_t tmp_time = tb_info->timestamp; if (code < 0) { - fprintf(stderr, "failed to insert data %d:reason; %s\n", code, taos_errstr(tb_info->taos)); + fprintf(stderr, "failed to insert data %d:reason; %s\n", code, taos_errstr(res)); exit(EXIT_FAILURE); } diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index ed0e717c437d43398583d6fbb58f7aafb9712f0b..85999f80e7e3e92daf0878d7643127c2771d7e34 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -67,7 +67,7 @@ int32_t mnodeInitProfile() { mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg); - tsMnodeConnCache = taosCacheInitWithCb(tsMnodeTmr, CONN_CHECK_TIME, mnodeFreeConn); + tsMnodeConnCache = taosCacheInitWithCb(CONN_CHECK_TIME, mnodeFreeConn); return 0; } diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index 30f491ec032bbac8b54dade8b883f15e618520ab..cb448bb564a752a33f9c0c119d47c7bcf7ecb342 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -65,7 +65,7 @@ int32_t mnodeInitShow() { mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg); mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg); - tsMnodeShowCache = taosCacheInitWithCb(tsMnodeTmr, 10, mnodeFreeShowObj); + tsMnodeShowCache = taosCacheInitWithCb(10, mnodeFreeShowObj); return 0; } diff --git a/src/query/inc/qfill.h b/src/query/inc/qfill.h index 9ea9c8f7cf3df75c182f33ea5122d0752b097334..da1cd8e5de7014d916be47713cdd2de034dc9521 100644 --- a/src/query/inc/qfill.h +++ b/src/query/inc/qfill.h @@ -50,7 +50,8 @@ typedef struct SFillInfo { char * nextValues; // next row of data char** pData; // original result data block involved in filling data int32_t capacityInRows; // data buffer size in rows - + int8_t slidingUnit; // sliding time unit + int8_t precision; // time resoluation SFillColInfo* pFillCol; // column info for fill operations } SFillInfo; @@ -61,12 +62,13 @@ typedef struct SPoint { int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, char timeUnit, int16_t precision); -SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, - int32_t numOfCols, int64_t slidingTime, int32_t fillType, SFillColInfo* pFillCol); +SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, + int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, + SFillColInfo* pFillCol); void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp); -void taosDestoryFillInfo(SFillInfo *pFillInfo); +void* taosDestoryFillInfo(SFillInfo *pFillInfo); void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey); @@ -74,9 +76,7 @@ void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput) void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInput); -TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterval, int8_t slidingTimeUnit, int8_t precision); - -int64_t taosGetNumOfResultWithFill(SFillInfo* pFillInfo, int32_t numOfRows, int64_t ekey, int32_t maxNumOfRows); +int64_t getFilledNumOfRes(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows); int32_t taosNumOfRemainRows(SFillInfo *pFillInfo); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 65a5ef45d37ed36a7dcc006d8b5ddeed744beae1..cdb05d2288671f6fcb619b01f60e6437ffa5ed97 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1466,7 +1466,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { tfree(pRuntimeEnv->pCtx); } - taosDestoryFillInfo(pRuntimeEnv->pFillInfo); + pRuntimeEnv->pFillInfo = taosDestoryFillInfo(pRuntimeEnv->pFillInfo); destroyResultBuf(pRuntimeEnv->pResultBuf, pQInfo); tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); @@ -3557,9 +3557,7 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) { * first result row in the actual result set will fill nothing. */ if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime, - pQuery->slidingTimeUnit, pQuery->precision); - int32_t numOfTotal = taosGetNumOfResultWithFill(pFillInfo, remain, ekey, pQuery->rec.capacity); + int32_t numOfTotal = getFilledNumOfRes(pFillInfo, pQuery->window.ekey, pQuery->rec.capacity); return numOfTotal > 0; } @@ -3601,7 +3599,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data } } -int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int32_t numOfRows, int32_t *numOfInterpo) { +int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int32_t *numOfInterpo) { SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); SQuery *pQuery = pRuntimeEnv->pQuery; SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo; @@ -4013,7 +4011,8 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) { SFillColInfo* pColInfo = taosCreateFillColInfo(pQuery); pRuntimeEnv->pFillInfo = taosInitFillInfo(pQuery->order.order, 0, 0, pQuery->rec.capacity, pQuery->numOfOutput, - pQuery->slidingTime, pQuery->fillType, pColInfo); + pQuery->slidingTime, pQuery->slidingTimeUnit, pQuery->precision, + pQuery->fillType, pColInfo); } // todo refactor @@ -4666,13 +4665,11 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { limitResults(pRuntimeEnv); break; } else { - TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime, - pQuery->slidingTimeUnit, pQuery->precision); - taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pQuery->rec.rows, ekey); + taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pQuery->rec.rows, pQuery->window.ekey); taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (tFilePage**) pQuery->sdata); numOfInterpo = 0; - pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, pQuery->rec.rows, &numOfInterpo); + pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfInterpo); if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { limitResults(pRuntimeEnv); break; @@ -4704,8 +4701,7 @@ static void tableQueryImpl(SQInfo *pQInfo) { * So, we do keep in this procedure instead of launching retrieve procedure for next results. */ int32_t numOfInterpo = 0; - int32_t remain = taosNumOfRemainRows(pRuntimeEnv->pFillInfo); - pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, remain, &numOfInterpo); + pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfInterpo); if (pQuery->rec.rows > 0) { limitResults(pRuntimeEnv); diff --git a/src/query/src/qfill.c b/src/query/src/qfill.c index 7b3ea5c1f09bac392d4e72f9c5870fc64f427c12..59bf7b423c8d291e0377301cd8afe7d63bd33ab1 100644 --- a/src/query/src/qfill.c +++ b/src/query/src/qfill.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#include "qfill.h" #include "os.h" +#include "qfill.h" #include "qextbuffer.h" #include "taosdef.h" #include "taosmsg.h" @@ -58,7 +58,7 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, ch } SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, - int64_t slidingTime, int32_t fillType, SFillColInfo* pFillCol) { + int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, SFillColInfo* pFillCol) { if (fillType == TSDB_FILL_NONE) { return NULL; } @@ -72,8 +72,10 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_ pFillInfo->pFillCol = pFillCol; pFillInfo->numOfTags = numOfTags; pFillInfo->numOfCols = numOfCols; + pFillInfo->precision = precision; pFillInfo->slidingTime = slidingTime; - + pFillInfo->slidingUnit = slidingUnit; + pFillInfo->pData = malloc(POINTER_BYTES * numOfCols); int32_t rowsize = 0; @@ -102,9 +104,9 @@ void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) { pFillInfo->numOfTotal = 0; } -void taosDestoryFillInfo(SFillInfo* pFillInfo) { +void* taosDestoryFillInfo(SFillInfo* pFillInfo) { if (pFillInfo == NULL) { - return; + return NULL; } tfree(pFillInfo->prevValues); @@ -119,6 +121,15 @@ void taosDestoryFillInfo(SFillInfo* pFillInfo) { tfree(pFillInfo->pFillCol); tfree(pFillInfo); + return NULL; +} + +static TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterval, int8_t slidingTimeUnit, int8_t precision) { + if (order == TSDB_ORDER_ASC) { + return ekey; + } else { + return taosGetIntervalStartTimestamp(ekey, timeInterval, slidingTimeUnit, precision); + } } void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) { @@ -126,8 +137,10 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) return; } + pFillInfo->endKey = taosGetRevisedEndKey(endKey, pFillInfo->order, pFillInfo->slidingTime, pFillInfo->slidingUnit, + pFillInfo->precision); + pFillInfo->rowIdx = 0; - pFillInfo->endKey = endKey; pFillInfo->numOfRows = numOfRows; // ensure the space @@ -165,36 +178,29 @@ void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInpu } } -TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterval, int8_t slidingTimeUnit, int8_t precision) { - if (order == TSDB_ORDER_ASC) { - return ekey; - } else { - return taosGetIntervalStartTimestamp(ekey, timeInterval, slidingTimeUnit, precision); - } -} +int64_t getFilledNumOfRes(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) { + int64_t* tsList = (int64_t*) pFillInfo->pData[0]; -static int32_t taosGetTotalNumOfFilledRes(SFillInfo* pFillInfo, const TSKEY* tsArray, int32_t remain, - int64_t nInterval, int64_t ekey) { - - if (remain > 0) { // still fill gap within current data block, not generating data after the result set. - TSKEY lastKey = tsArray[pFillInfo->numOfRows - 1]; - int32_t total = (int32_t)(labs(lastKey - pFillInfo->start) / nInterval) + 1; + int32_t numOfRows = taosNumOfRemainRows(pFillInfo); + + TSKEY ekey1 = taosGetRevisedEndKey(ekey, pFillInfo->order, pFillInfo->slidingTime, pFillInfo->slidingUnit, + pFillInfo->precision); - assert(total >= remain); - return total; + int64_t numOfRes = -1; + if (numOfRows > 0) { // still fill gap within current data block, not generating data after the result set. + TSKEY lastKey = tsList[pFillInfo->numOfRows - 1]; + + numOfRes = (int64_t)(labs(lastKey - pFillInfo->start) / pFillInfo->slidingTime) + 1; + assert(numOfRes >= numOfRows); } else { // reach the end of data - if ((ekey < pFillInfo->start && FILL_IS_ASC_FILL(pFillInfo)) || - (ekey > pFillInfo->start && !FILL_IS_ASC_FILL(pFillInfo))) { + if ((ekey1 < pFillInfo->start && FILL_IS_ASC_FILL(pFillInfo)) || + (ekey1 > pFillInfo->start && !FILL_IS_ASC_FILL(pFillInfo))) { return 0; - } else { - return (int32_t)(labs(ekey - pFillInfo->start) / nInterval) + 1; + } else { // the numOfRes rows are all filled with specified policy + numOfRes = (labs(ekey1 - pFillInfo->start) / pFillInfo->slidingTime) + 1; } } -} -int64_t taosGetNumOfResultWithFill(SFillInfo* pFillInfo, int32_t numOfRows, int64_t ekey, int32_t maxNumOfRows) { - int32_t numOfRes = taosGetTotalNumOfFilledRes(pFillInfo, (int64_t*) pFillInfo->pData[0], numOfRows, - pFillInfo->slidingTime, ekey); return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes; } @@ -496,8 +502,8 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu int64_t taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity) { int32_t remain = taosNumOfRemainRows(pFillInfo); // todo use iterator? - int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, pFillInfo->endKey, capacity); + int32_t rows = getFilledNumOfRes(pFillInfo, pFillInfo->endKey, capacity); int32_t numOfRes = generateDataBlockImpl(pFillInfo, output, remain, rows, pFillInfo->pData); assert(numOfRes == rows); diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index d3a909a1629adb1ee5523a058cb22f70ac55e112..d3bc45e2e4706f2892b4bfd75e8b9b029aa0dc6e 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -233,8 +233,6 @@ TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo); pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL; -// pQueryHandle->outputCapacity = 2; // only allowed two rows to be loaded - changeQueryHandleForInterpQuery(pQueryHandle); return pQueryHandle; } @@ -618,54 +616,19 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){ SQueryFilePos* cur = &pQueryHandle->cur; SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlock); + /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo); - - TSKEY k1 = TSKEY_INITIAL_VAL, k2 = TSKEY_INITIAL_VAL; - if (pCheckInfo->iter != NULL && tSkipListIterGet(pCheckInfo->iter) != NULL) { - SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); - - SDataRow row = SL_GET_NODE_DATA(node); - k1 = dataRowKey(row); - - if (k1 == binfo.window.skey) { - if (tSkipListIterNext(pCheckInfo->iter)) { - node = tSkipListIterGet(pCheckInfo->iter); - row = SL_GET_NODE_DATA(node); - k1 = dataRowKey(row); - } else { - k1 = TSKEY_INITIAL_VAL; - } - } - } - - if (pCheckInfo->iiter != NULL && tSkipListIterGet(pCheckInfo->iiter) != NULL) { - SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); - - SDataRow row = SL_GET_NODE_DATA(node); - k2 = dataRowKey(row); - - if (k2 == binfo.window.skey) { - if (tSkipListIterNext(pCheckInfo->iiter)) { - node = tSkipListIterGet(pCheckInfo->iiter); - row = SL_GET_NODE_DATA(node); - k2 = dataRowKey(row); - } else { - k2 = TSKEY_INITIAL_VAL; - } - } - } - + SDataRow row = getSDataRowInTableMem(pCheckInfo); + + TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL; cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(binfo.rows-1); - if ((ASCENDING_TRAVERSE(pQueryHandle->order) && - ((k1 != TSKEY_INITIAL_VAL && k1 <= binfo.window.ekey) || (k2 != TSKEY_INITIAL_VAL && k2 <= binfo.window.ekey))) || - (!ASCENDING_TRAVERSE(pQueryHandle->order) && - ((k1 != TSKEY_INITIAL_VAL && k1 >= binfo.window.skey) || (k2 != TSKEY_INITIAL_VAL && k2 >= binfo.window.skey)))) { + if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) || + (!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) { - if ((ASCENDING_TRAVERSE(pQueryHandle->order) && - ((k1 != TSKEY_INITIAL_VAL && k1 < binfo.window.skey) || (k2 != TSKEY_INITIAL_VAL && k2 < binfo.window.skey))) || - (!ASCENDING_TRAVERSE(pQueryHandle->order) && - (((k1 != TSKEY_INITIAL_VAL && k1 > binfo.window.skey) || (k2 != TSKEY_INITIAL_VAL && k2 > binfo.window.skey))))) { + if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) || + (!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) { + // do not load file block into buffer int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1; @@ -756,7 +719,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock return pQueryHandle->realNumOfRows > 0; } -static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) { +static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) { int firstPos, lastPos, midPos = -1; int numOfRows; TSKEY* keyList; @@ -868,37 +831,63 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap return numOfRows + num; } -static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, int32_t capacity, - int32_t numOfRows, SDataRow row, STSchema* pSchema) { - int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns); - int32_t numOfTableCols = schemaNCols(pSchema); - +static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row, + STsdbMeta *pMeta, int32_t numOfCols, STable* pTable) { char* pData = NULL; - for (int32_t i = 0; i < numOfCols; ++i) { + + // the schema version info is embeded in SDataRow + STSchema* pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, dataRowVersion(row)); + int32_t numOfRowCols = schemaNCols(pSchema); + + int32_t i = 0, j = 0; + while(i < numOfCols && j < numOfRowCols) { SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); - + if (pSchema->columns[j].colId < pColInfo->info.colId) { + j++; + continue; + } + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; } else { pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; } - - int32_t offset = 0; - for (int32_t j = 0; j < numOfTableCols; ++j) { - if (pColInfo->info.colId == pSchema->columns[j].colId) { - offset = pSchema->columns[j].offset; - break; + + if (pSchema->columns[j].colId == pColInfo->info.colId) { + void* value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); + if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { + memcpy(pData, value, varDataTLen(value)); + } else { + memcpy(pData, value, pColInfo->info.bytes); + } + + j++; + i++; + } else { // pColInfo->info.colId < pSchema->columns[j].colId, it is a NULL data + if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { + setVardataNull(pData, pColInfo->info.type); + } else { + setNull(pData, pColInfo->info.type, pColInfo->info.bytes); } + i++; } - - assert(offset != -1); // todo handle error - void* value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + offset); - + } + + while (i < numOfCols) { // the remain columns are all null data + SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { + pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; + } else { + pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; + } + if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { - memcpy(pData, value, varDataTLen(value)); + setVardataNull(pData, pColInfo->info.type); } else { - memcpy(pData, value, pColInfo->info.bytes); + setNull(pData, pColInfo->info.type, pColInfo->info.bytes); } + + i++; } } @@ -911,7 +900,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* initTableMemIterator(pQueryHandle, pCheckInfo); SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; - + + // for search the endPos, so the order needs to reverse + int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; + + int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; + int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns); + + STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb); + STable* pTable = pCheckInfo->pTableObj; + int32_t endPos = cur->pos; if (ASCENDING_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) { endPos = blockInfo.rows - 1; @@ -920,8 +918,8 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* endPos = 0; cur->mixBlock = (cur->pos != blockInfo.rows - 1); } else { - int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; - endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order); + assert(pCols->numOfRows > 0); + endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order); cur->mixBlock = true; } @@ -933,8 +931,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* int32_t numOfRows = 0; pQueryHandle->cur.win = TSWINDOW_INITIALIZER; - int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; - + // no data in buffer, load data from file directly if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) { int32_t start = cur->pos; @@ -950,12 +947,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* // todo opt in case of no data in buffer numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end); - // if the buffer is not full in case of descending order query, move the data in the front of the buffer + // if the buffer is not full in case of descending order query, move the data in the front of the buffer if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < pQueryHandle->outputCapacity) { int32_t emptySize = pQueryHandle->outputCapacity - numOfRows; - int32_t reqNumOfCols = taosArrayGetSize(pQueryHandle->pColumns); - - for(int32_t i = 0; i < reqNumOfCols; ++i) { + + for(int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); } @@ -969,20 +965,15 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pQueryHandle->realNumOfRows = numOfRows; cur->rows = numOfRows; return; - } else if (pCheckInfo->iter != NULL && pCheckInfo->iiter == NULL) { - // } else if (pCheckInfo->iter == NULL && pCheckInfo->iiter != NULL) { - // } else { // iter and iiter are all not NULL, three-way merge data block - STSchema* pSchema = tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj); + } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) { SSkipListNode* node = NULL; - do { - node = tSkipListIterGet(pCheckInfo->iter); - if (node == NULL) { + SDataRow row = getSDataRowInTableMem(pCheckInfo); + if (row == NULL) { break; } - SDataRow row = SL_GET_NODE_DATA(node); - TSKEY key = dataRowKey(row); + TSKEY key = dataRowKey(row); if ((key > pQueryHandle->window.ekey && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < pQueryHandle->window.ekey && !ASCENDING_TRAVERSE(pQueryHandle->order))) { break; @@ -995,7 +986,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { - copyOneRowFromMem(pQueryHandle, pCheckInfo, pQueryHandle->outputCapacity, numOfRows, row, pSchema); + copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, pMeta, numOfCols, pTable); numOfRows += 1; if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; @@ -1005,17 +996,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* cur->lastKey = key + step; cur->mixBlock = true; - tSkipListIterNext(pCheckInfo->iter); + moveToNextRow(pCheckInfo); } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it - tSkipListIterNext(pCheckInfo->iter); + moveToNextRow(pCheckInfo); } else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = tsArray[pos]; } - int32_t order = ASCENDING_TRAVERSE(pQueryHandle->order) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; - int32_t end = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order); + int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order); if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it tSkipListIterNext(pCheckInfo->iter); } @@ -1093,9 +1083,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* // if the buffer is not full in case of descending order query, move the data in the front of the buffer if (numOfRows < pQueryHandle->outputCapacity) { int32_t emptySize = pQueryHandle->outputCapacity - numOfRows; - - int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns); - for(int32_t i = 0; i < requiredNumOfCols; ++i) { + for(int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); } @@ -1567,9 +1555,6 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) { for(int32_t i = 0; i < numOfTables; ++i) { STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); - if (pCheckInfo->pTableObj->tableId.uid == 12094628167747) { - printf("abc\n"); - } if (pCheckInfo->pTableObj->lastKey > key) { key = pCheckInfo->pTableObj->lastKey; index = i; @@ -1652,9 +1637,9 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int *skey = TSKEY_INITIAL_VAL; int64_t st = taosGetTimestampUs(); - STSchema* pSchema = tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj); - int32_t numOfTableCols = schemaNCols(pSchema); - + STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb); + STable* pTable = pCheckInfo->pTableObj; + do { SDataRow row = getSDataRowInTableMem(pCheckInfo); if (row == NULL) { @@ -1662,10 +1647,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int } TSKEY key = dataRowKey(row); - - if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) || - (key < maxKey && !ASCENDING_TRAVERSE(pQueryHandle->order))) { - + if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pQueryHandle->order))) { tsdbTrace("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey, pQueryHandle->window.ekey); @@ -1677,59 +1659,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int } *ekey = key; - char* pData = NULL; - - int32_t i = 0, j = 0; - while(i < numOfCols && j < numOfTableCols) { - SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); - if (pSchema->columns[j].colId < pColInfo->info.colId) { - j++; - continue; - } - - if (ASCENDING_TRAVERSE(pQueryHandle->order)) { - pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; - } else { - pData = pColInfo->pData + (maxRowsToRead - numOfRows - 1) * pColInfo->info.bytes; - } - - if (pSchema->columns[j].colId == pColInfo->info.colId) { - void* value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); - if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { - memcpy(pData, value, varDataTLen(value)); - } else { - memcpy(pData, value, pColInfo->info.bytes); - } - - j++; - i++; - } else { // pColInfo->info.colId < pSchema->columns[j].colId, it is a NULL data - if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { - setVardataNull(pData, pColInfo->info.type); - } else { - setNull(pData, pColInfo->info.type, pColInfo->info.bytes); - } - i++; - } - } - - while (i < numOfCols) { // the remain columns are all null data - SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); - if (ASCENDING_TRAVERSE(pQueryHandle->order)) { - pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; - } else { - pData = pColInfo->pData + (maxRowsToRead - numOfRows - 1) * pColInfo->info.bytes; - } - - if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { - setVardataNull(pData, pColInfo->info.type); - } else { - setNull(pData, pColInfo->info.type, pColInfo->info.bytes); - } - - i++; - } - + copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, pMeta, numOfCols, pTable); + if (++numOfRows >= maxRowsToRead) { moveToNextRow(pCheckInfo); break; diff --git a/src/util/inc/tcache.h b/src/util/inc/tcache.h index 2369f63f16fefa3bfd49d0b7837eab011e17c798..17b38238316bf74d67824b119b774a8cd5804860 100644 --- a/src/util/inc/tcache.h +++ b/src/util/inc/tcache.h @@ -37,8 +37,8 @@ typedef struct SCacheDataNode { uint64_t expiredTime; // expiredTime expiredTime when this element should be remove from cache uint64_t signature; uint32_t size; // allocated size for current SCacheDataNode - uint16_t keySize : 15; - bool inTrash : 1; // denote if it is in trash or not + uint16_t keySize: 15; + bool inTrashCan: 1;// denote if it is in trash or not T_REF_DECLARE() char *key; char data[]; @@ -50,46 +50,49 @@ typedef struct STrashElem { SCacheDataNode *pData; } STrashElem; +/* + * to accommodate the old data which has the same key value of new one in hashList + * when an new node is put into cache, if an existed one with the same key: + * 1. if the old one does not be referenced, update it. + * 2. otherwise, move the old one to pTrash, addedTime the new one. + * + * when the node in pTrash does not be referenced, it will be release at the expired expiredTime + */ typedef struct { - int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included. - int64_t refreshTime; - - /* - * to accommodate the old datanode which has the same key value of new one in hashList - * when an new node is put into cache, if an existed one with the same key: - * 1. if the old one does not be referenced, update it. - * 2. otherwise, move the old one to pTrash, addedTime the new one. - * - * when the node in pTrash does not be referenced, it will be release at the expired expiredTime - */ - STrashElem * pTrash; - void * tmrCtrl; - void * pTimer; - SCacheStatis statistics; - SHashObj * pHashTable; + int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included. + int64_t refreshTime; + STrashElem * pTrash; + void * tmrCtrl; + void * pTimer; + SCacheStatis statistics; + SHashObj * pHashTable; _hash_free_fn_t freeFp; - int numOfElemsInTrash; // number of element in trash - int16_t deleting; // set the deleting flag to stop refreshing ASAP. - T_REF_DECLARE() + uint32_t numOfElemsInTrash; // number of element in trash + uint8_t deleting; // set the deleting flag to stop refreshing ASAP. + pthread_t refreshWorker; #if defined(LINUX) pthread_rwlock_t lock; #else pthread_mutex_t lock; #endif - } SCacheObj; /** - * - * @param maxSessions maximum slots available for hash elements - * @param tmrCtrl timer ctrl + * initialize the cache object * @param refreshTime refresh operation interval time, the maximum survival time when one element is expired and * not referenced by other objects * @return */ -SCacheObj *taosCacheInit(void *tmrCtrl, int64_t refreshTimeInSeconds); -SCacheObj *taosCacheInitWithCb(void *tmrCtrl, int64_t refreshTimeInSeconds, void (*freeCb)(void *data)); +SCacheObj *taosCacheInit(int64_t refreshTimeInSeconds); + +/** + * initialize the cache object and set the free object callback function + * @param refreshTimeInSeconds + * @param freeCb + * @return + */ +SCacheObj *taosCacheInitWithCb(int64_t refreshTimeInSeconds, void (*freeCb)(void *data)); /** * add data into cache diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index b0291b5cc0e9e3bf99ec2825582e7786629f2c6f..3ce051fceea2555187bd954b3d9bcfdd6345d850 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -77,31 +77,7 @@ static FORCE_INLINE void taosFreeNode(void *data) { * @param lifespan total survial expiredTime from now * @return SCacheDataNode */ -static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, - uint64_t duration) { - size_t totalSize = size + sizeof(SCacheDataNode) + keyLen + 1; - - SCacheDataNode *pNewNode = calloc(1, totalSize); - if (pNewNode == NULL) { - uError("failed to allocate memory, reason:%s", strerror(errno)); - return NULL; - } - - memcpy(pNewNode->data, pData, size); - - pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + size; - pNewNode->keySize = keyLen; - - memcpy(pNewNode->key, key, keyLen); - - pNewNode->addedTime = (uint64_t)taosGetTimestampMs(); - pNewNode->expiredTime = pNewNode->addedTime + duration; - - pNewNode->signature = (uint64_t)pNewNode; - pNewNode->size = (uint32_t)totalSize; - - return pNewNode; -} +static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration); /** * addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash @@ -109,50 +85,15 @@ static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const * @param pCacheObj Cache object * @param pNode Cache slot object */ -static void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) { - if (pNode->inTrash) { /* node is already in trash */ - return; - } - - STrashElem *pElem = calloc(1, sizeof(STrashElem)); - pElem->pData = pNode; - - pElem->next = pCacheObj->pTrash; - if (pCacheObj->pTrash) { - pCacheObj->pTrash->prev = pElem; - } - - pElem->prev = NULL; - pCacheObj->pTrash = pElem; - - pNode->inTrash = true; - pCacheObj->numOfElemsInTrash++; - - uTrace("key:%s %p move to trash, numOfElem in trash:%d", pNode->key, pNode, pCacheObj->numOfElemsInTrash); -} +static void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode); + +/** + * remove node in trash can + * @param pCacheObj + * @param pElem + */ +static void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem); -static void taosRemoveFromTrash(SCacheObj *pCacheObj, STrashElem *pElem) { - if (pElem->pData->signature != (uint64_t)pElem->pData) { - uError("key:sig:%d %p data has been released, ignore", pElem->pData->signature, pElem->pData); - return; - } - - pCacheObj->numOfElemsInTrash--; - if (pElem->prev) { - pElem->prev->next = pElem->next; - } else { /* pnode is the header, update header */ - pCacheObj->pTrash = pElem->next; - } - - if (pElem->next) { - pElem->next->prev = pElem->prev; - } - - pElem->pData->signature = 0; - if (pCacheObj->freeFp) pCacheObj->freeFp(pElem->pData->data); - free(pElem->pData); - free(pElem); -} /** * remove nodes in trash with refCount == 0 in cache * @param pNode @@ -160,42 +101,7 @@ static void taosRemoveFromTrash(SCacheObj *pCacheObj, STrashElem *pElem) { * @param force force model, if true, remove data in trash without check refcount. * may cause corruption. So, forece model only applys before cache is closed */ -static void taosTrashEmpty(SCacheObj *pCacheObj, bool force) { - __cache_wr_lock(pCacheObj); - - if (pCacheObj->numOfElemsInTrash == 0) { - if (pCacheObj->pTrash != NULL) { - uError("key:inconsistency data in cache, numOfElem in trash:%d", pCacheObj->numOfElemsInTrash); - } - pCacheObj->pTrash = NULL; - - __cache_unlock(pCacheObj); - return; - } - - STrashElem *pElem = pCacheObj->pTrash; - - while (pElem) { - T_REF_VAL_CHECK(pElem->pData); - if (pElem->next == pElem) { - pElem->next = NULL; - } - - if (force || (T_REF_VAL_GET(pElem->pData) == 0)) { - uTrace("key:%s %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData, - pCacheObj->numOfElemsInTrash - 1); - STrashElem *p = pElem; - - pElem = pElem->next; - taosRemoveFromTrash(pCacheObj, p); - } else { - pElem = pElem->next; - } - } - - assert(pCacheObj->numOfElemsInTrash >= 0); - __cache_unlock(pCacheObj); -} +static void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force); /** * release node @@ -304,87 +210,20 @@ static FORCE_INLINE SCacheDataNode *taosAddToCacheImpl(SCacheObj *pCacheObj, con return pNode; } -static void doCleanupDataCache(SCacheObj *pCacheObj) { - __cache_wr_lock(pCacheObj); - - //if (taosHashGetSize(pCacheObj->pHashTable) > 0) { - taosHashCleanup(pCacheObj->pHashTable); - //} - - __cache_unlock(pCacheObj); - - taosTrashEmpty(pCacheObj, true); - __cache_lock_destroy(pCacheObj); - - memset(pCacheObj, 0, sizeof(SCacheObj)); - free(pCacheObj); -} +/** + * do cleanup the taos cache + * @param pCacheObj + */ +static void doCleanupDataCache(SCacheObj *pCacheObj); /** * refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime * @param handle Cache object handle */ -static void taosCacheRefresh(void *handle, void *tmrId) { - SCacheObj *pCacheObj = (SCacheObj *)handle; - - if (pCacheObj == NULL || T_REF_VAL_GET(pCacheObj) == 0) { - uTrace("object is destroyed. no refresh retry"); - return; - } - - int16_t ref = T_REF_INC(pCacheObj); - if (ref == 1) { - T_REF_DEC(pCacheObj); - return; - } - - // todo add the ref before start the timer - int32_t num = taosHashGetSize(pCacheObj->pHashTable); - if (num == 0) { - ref = T_REF_DEC(pCacheObj); - if (ref == 0) { - doCleanupDataCache(pCacheObj); - } else { - taosTmrReset(taosCacheRefresh, pCacheObj->refreshTime, pCacheObj, pCacheObj->tmrCtrl, &pCacheObj->pTimer); - } - return; - } - - uint64_t expiredTime = taosGetTimestampMs(); - pCacheObj->statistics.refreshCount++; - - SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable); - - __cache_wr_lock(pCacheObj); - while (taosHashIterNext(pIter)) { - if (pCacheObj->deleting == 1) { - taosHashDestroyIter(pIter); - break; - } - - SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); - if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) { - taosCacheReleaseNode(pCacheObj, pNode); - } - } - - __cache_unlock(pCacheObj); - - taosHashDestroyIter(pIter); +static void* taosCacheRefresh(void *handle); - taosTrashEmpty(pCacheObj, false); - - ref = T_REF_DEC(pCacheObj); - if (ref == 0) { - doCleanupDataCache(pCacheObj); - return; - } else { - taosTmrReset(taosCacheRefresh, pCacheObj->refreshTime, pCacheObj, pCacheObj->tmrCtrl, &pCacheObj->pTimer); - } -} - -SCacheObj *taosCacheInitWithCb(void *tmrCtrl, int64_t refreshTime, void (*freeCb)(void *data)) { - if (tmrCtrl == NULL || refreshTime <= 0) { +SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data)) { + if (refreshTime <= 0) { return NULL; } @@ -394,7 +233,7 @@ SCacheObj *taosCacheInitWithCb(void *tmrCtrl, int64_t refreshTime, void (*freeCb return NULL; } - pCacheObj->pHashTable = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false); + pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false); if (pCacheObj->pHashTable == NULL) { free(pCacheObj); uError("failed to allocate memory, reason:%s", strerror(errno)); @@ -406,25 +245,27 @@ SCacheObj *taosCacheInitWithCb(void *tmrCtrl, int64_t refreshTime, void (*freeCb pCacheObj->freeFp = freeCb; pCacheObj->refreshTime = refreshTime * 1000; - pCacheObj->tmrCtrl = tmrCtrl; - - taosTmrReset(taosCacheRefresh, pCacheObj->refreshTime, pCacheObj, pCacheObj->tmrCtrl, &pCacheObj->pTimer); - + if (__cache_lock_init(pCacheObj) != 0) { - taosTmrStopA(&pCacheObj->pTimer); taosHashCleanup(pCacheObj->pHashTable); free(pCacheObj); uError("failed to init lock, reason:%s", strerror(errno)); return NULL; } - - T_REF_INC(pCacheObj); + + pthread_attr_t thattr = {0}; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + + pthread_create(&pCacheObj->refreshWorker, &thattr, taosCacheRefresh, pCacheObj); + + pthread_attr_destroy(&thattr); return pCacheObj; } -SCacheObj *taosCacheInit(void *tmrCtrl, int64_t refreshTime) { - return taosCacheInitWithCb(tmrCtrl, refreshTime, NULL); +SCacheObj *taosCacheInit(int64_t refreshTime) { + return taosCacheInitWithCb(refreshTime, NULL); } void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, size_t dataSize, int duration) { @@ -600,16 +441,188 @@ void taosCacheEmpty(SCacheObj *pCacheObj) { __cache_unlock(pCacheObj); taosHashDestroyIter(pIter); - taosTrashEmpty(pCacheObj, false); + taosTrashCanEmpty(pCacheObj, false); } void taosCacheCleanup(SCacheObj *pCacheObj) { if (pCacheObj == NULL) { return; } - - int32_t ref = T_REF_DEC(pCacheObj); - if (ref == 0) { - doCleanupDataCache(pCacheObj); + + pCacheObj->deleting = 1; + pthread_join(pCacheObj->refreshWorker, NULL); + + doCleanupDataCache(pCacheObj); +} + +SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, + uint64_t duration) { + size_t totalSize = size + sizeof(SCacheDataNode) + keyLen + 1; + + SCacheDataNode *pNewNode = calloc(1, totalSize); + if (pNewNode == NULL) { + uError("failed to allocate memory, reason:%s", strerror(errno)); + return NULL; + } + + memcpy(pNewNode->data, pData, size); + + pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + size; + pNewNode->keySize = keyLen; + + memcpy(pNewNode->key, key, keyLen); + + pNewNode->addedTime = (uint64_t)taosGetTimestampMs(); + pNewNode->expiredTime = pNewNode->addedTime + duration; + + pNewNode->signature = (uint64_t)pNewNode; + pNewNode->size = (uint32_t)totalSize; + + return pNewNode; +} + +void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) { + if (pNode->inTrashCan) { /* node is already in trash */ + return; + } + + STrashElem *pElem = calloc(1, sizeof(STrashElem)); + pElem->pData = pNode; + + pElem->next = pCacheObj->pTrash; + if (pCacheObj->pTrash) { + pCacheObj->pTrash->prev = pElem; + } + + pElem->prev = NULL; + pCacheObj->pTrash = pElem; + + pNode->inTrashCan = true; + pCacheObj->numOfElemsInTrash++; + + uTrace("key:%s %p move to trash, numOfElem in trash:%d", pNode->key, pNode, pCacheObj->numOfElemsInTrash); +} + +void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) { + if (pElem->pData->signature != (uint64_t)pElem->pData) { + uError("key:sig:%d %p data has been released, ignore", pElem->pData->signature, pElem->pData); + return; + } + + pCacheObj->numOfElemsInTrash--; + if (pElem->prev) { + pElem->prev->next = pElem->next; + } else { /* pnode is the header, update header */ + pCacheObj->pTrash = pElem->next; + } + + if (pElem->next) { + pElem->next->prev = pElem->prev; + } + + pElem->pData->signature = 0; + if (pCacheObj->freeFp) pCacheObj->freeFp(pElem->pData->data); + free(pElem->pData); + free(pElem); +} + +void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) { + __cache_wr_lock(pCacheObj); + + if (pCacheObj->numOfElemsInTrash == 0) { + if (pCacheObj->pTrash != NULL) { + uError("key:inconsistency data in cache, numOfElem in trash:%d", pCacheObj->numOfElemsInTrash); + } + pCacheObj->pTrash = NULL; + + __cache_unlock(pCacheObj); + return; } + + STrashElem *pElem = pCacheObj->pTrash; + + while (pElem) { + T_REF_VAL_CHECK(pElem->pData); + if (pElem->next == pElem) { + pElem->next = NULL; + } + + if (force || (T_REF_VAL_GET(pElem->pData) == 0)) { + uTrace("key:%s %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData, + pCacheObj->numOfElemsInTrash - 1); + STrashElem *p = pElem; + + pElem = pElem->next; + taosRemoveFromTrashCan(pCacheObj, p); + } else { + pElem = pElem->next; + } + } + + assert(pCacheObj->numOfElemsInTrash >= 0); + __cache_unlock(pCacheObj); +} + +void doCleanupDataCache(SCacheObj *pCacheObj) { + __cache_wr_lock(pCacheObj); + taosHashCleanup(pCacheObj->pHashTable); + __cache_unlock(pCacheObj); + + taosTrashCanEmpty(pCacheObj, true); + __cache_lock_destroy(pCacheObj); + + memset(pCacheObj, 0, sizeof(SCacheObj)); + free(pCacheObj); } + +void* taosCacheRefresh(void *handle) { + SCacheObj *pCacheObj = (SCacheObj *)handle; + if (pCacheObj == NULL) { + uTrace("object is destroyed. no refresh retry"); + return NULL; + } + + const int32_t SLEEP_DURATION = 500; //500 ms + int64_t totalTick = pCacheObj->refreshTime / SLEEP_DURATION; + + int64_t count = 0; + while(1) { + taosMsleep(500); + + // check if current cache object will be deleted every 500ms. + if (pCacheObj->deleting) { + break; + } + + if (++count < totalTick) { + continue; + } + + // reset the count value + count = 0; + size_t num = taosHashGetSize(pCacheObj->pHashTable); + if (num == 0) { + continue; + } + + uint64_t expiredTime = taosGetTimestampMs(); + pCacheObj->statistics.refreshCount++; + + SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable); + + __cache_wr_lock(pCacheObj); + while (taosHashIterNext(pIter)) { + SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); + if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) { + taosCacheReleaseNode(pCacheObj, pNode); + } + } + + __cache_unlock(pCacheObj); + + taosHashDestroyIter(pIter); + taosTrashCanEmpty(pCacheObj, false); + } + + return NULL; +} \ No newline at end of file diff --git a/src/util/tests/cacheTest.cpp b/src/util/tests/cacheTest.cpp index b636bcfac0c1252d087f570b0c99a2a2f7ec812b..5762d5700bbb1945490846dcfaaf0e12f0fbaa27 100644 --- a/src/util/tests/cacheTest.cpp +++ b/src/util/tests/cacheTest.cpp @@ -19,8 +19,7 @@ int32_t tsMaxMeterConnections = 200; // test cache TEST(testCase, client_cache_test) { const int32_t REFRESH_TIME_IN_SEC = 2; - void* tscTmr = taosTmrInit (tsMaxMgmtConnections*2, 200, 6000, "TSC"); - SCacheObj* tscCacheHandle = taosCacheInit(tscTmr, REFRESH_TIME_IN_SEC); + SCacheObj* tscCacheHandle = taosCacheInit(REFRESH_TIME_IN_SEC); const char* key1 = "test1"; char data1[] = "test11"; @@ -106,9 +105,7 @@ TEST(testCase, client_cache_test) { TEST(testCase, cache_resize_test) { const int32_t REFRESH_TIME_IN_SEC = 2; - void* tscTmr = taosTmrInit (1000*2, 200, 6000, "TSC"); - - auto* pCache = taosCacheInit(tscTmr, REFRESH_TIME_IN_SEC); + auto* pCache = taosCacheInit(REFRESH_TIME_IN_SEC); char key[256] = {0}; char data[1024] = "abcdefghijk";