From 3d79ac346fb3362ae693e30e4c7d41d0b957091c Mon Sep 17 00:00:00 2001 From: AlexDuan <417921451@qq.com> Date: Mon, 23 Aug 2021 13:51:10 +0800 Subject: [PATCH] qRequire ref add --- src/client/src/tscSystem.c | 2 +- src/inc/tsdb.h | 5 +- src/mnode/src/mnodeProfile.c | 4 +- src/mnode/src/mnodeShow.c | 4 +- src/plugins/http/src/httpContext.c | 4 +- src/plugins/http/src/httpSession.c | 2 +- src/query/src/qExecutor.c | 2 +- src/query/src/queryMain.c | 133 +++++++++++++++++------------ src/tsdb/inc/tsdbHealth.h | 3 + src/tsdb/inc/tsdbint.h | 1 + src/tsdb/src/tsdbBuffer.c | 12 ++- src/tsdb/src/tsdbHealth.c | 28 ++++-- src/tsdb/src/tsdbMain.c | 6 ++ src/util/inc/tcache.h | 4 +- src/util/src/tcache.c | 21 ++--- 15 files changed, 148 insertions(+), 83 deletions(-) diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index c04765b065..dc4a32cc13 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -59,7 +59,7 @@ void tscCheckDiskUsage(void *UNUSED_PARAM(para), void *UNUSED_PARAM(param)) { taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr); } -void tscFreeRpcObj(void *param) { +void tscFreeRpcObj(void *param, void* param1) { assert(param); SRpcObj *pRpcObj = (SRpcObj *)(param); tscDebug("free rpcObj:%p and free pDnodeConn: %p", pRpcObj, pRpcObj->pDnodeConn); diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 52c99a3fe5..b2219a53a7 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -416,8 +416,9 @@ int tsdbSyncRecv(void *pRepo, SOCKET socketFd); int tsdbCompact(STsdbRepo *pRepo); // For TSDB Health Monitor -bool tsdbAllowNewBlock(STsdbRepo* pRepo); -bool tsdbIdleMemEnough(); + +// no problem return true +bool tsdbNoProblem(STsdbRepo* pRepo); #ifdef __cplusplus } diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index 15438fc234..5c3063128c 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -46,7 +46,7 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi static void mnodeCancelGetNextConn(void *pIter); static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static void mnodeFreeConn(void *data); +static void mnodeFreeConn(void *data, void* param1); static int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg); @@ -135,7 +135,7 @@ SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t po return pConn; } -static void mnodeFreeConn(void *data) { +static void mnodeFreeConn(void *data, void* param1) { SConnObj *pConn = data; tfree(pConn->pQueries); tfree(pConn->pStreams); diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index bbfdb52e05..4e3c4797ac 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -46,7 +46,7 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessConnectMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessUseMsg(SMnodeMsg *mnodeMsg); -static void mnodeFreeShowObj(void *data); +static void mnodeFreeShowObj(void *data, void* param1); static bool mnodeAccquireShowObj(SShowObj *pShow); static bool mnodeCheckShowFinished(SShowObj *pShow); static void *mnodePutShowObj(SShowObj *pShow); @@ -420,7 +420,7 @@ static void* mnodePutShowObj(SShowObj *pShow) { return NULL; } -static void mnodeFreeShowObj(void *data) { +static void mnodeFreeShowObj(void *data, void* param1) { SShowObj *pShow = *(SShowObj **)data; if (tsMnodeShowFreeIterFp[pShow->type] != NULL) { if (pShow->pVgIter != NULL) { diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c index 51adef11b9..7631c6d668 100644 --- a/src/plugins/http/src/httpContext.c +++ b/src/plugins/http/src/httpContext.c @@ -29,7 +29,7 @@ #include "httpContext.h" #include "httpParser.h" -static void httpDestroyContext(void *data); +static void httpDestroyContext(void *data, void* param1); static void httpRemoveContextFromEpoll(HttpContext *pContext) { HttpThread *pThread = pContext->pThread; @@ -44,7 +44,7 @@ static void httpRemoveContextFromEpoll(HttpContext *pContext) { } } -static void httpDestroyContext(void *data) { +static void httpDestroyContext(void *data, void* param1) { HttpContext *pContext = *(HttpContext **)data; if (pContext->fd > 0) taosCloseSocket(pContext->fd); diff --git a/src/plugins/http/src/httpSession.c b/src/plugins/http/src/httpSession.c index 2e1ee7df2f..fd2415a5ab 100644 --- a/src/plugins/http/src/httpSession.c +++ b/src/plugins/http/src/httpSession.c @@ -95,7 +95,7 @@ void httpReleaseSession(HttpContext *pContext) { pContext->session = NULL; } -static void httpDestroySession(void *data) { +static void httpDestroySession(void *data, void* param1) { HttpSession *session = data; httpDebug("session:%p:%p, is destroyed, sessionRef:%d", session, session->taos, session->refCount); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 303612fc8e..e22552c265 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2387,7 +2387,7 @@ bool isQueryKilled(SQInfo *pQInfo) { (!needBuildResAfterQueryComplete(pQInfo))) { assert(pQInfo->startExecTs != 0); - qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d sec, abort current query execution, start:%" PRId64 + qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64 ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec()); return true; } diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index d9b01b031d..a409d955ad 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -35,12 +35,12 @@ typedef struct SQueryMgmt { bool closed; } SQueryMgmt; -static void queryMgmtKillQueryFn(void* handle) { +static void queryMgmtKillQueryFn(void* handle, void* param1) { void** fp = (void**)handle; qKillQuery(*fp); } -static void freeqinfoFn(void *qhandle) { +static void freeqinfoFn(void *qhandle, void* param1) { void** handle = qhandle; if (handle == NULL || *handle == NULL) { return; @@ -254,8 +254,6 @@ int waitMoment(SQInfo* pQInfo){ } } } - - taosMsleep(ms); } return 1; } @@ -274,7 +272,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) { } *qId = pQInfo->qId; - pQInfo->startExecTs = taosGetTimestampSec(); + pQInfo->startExecTs = taosGetTimestampMs(); if (isQueryKilled(pQInfo)) { qDebug("QInfo:0x%"PRIx64" it is already killed, abort", pQInfo->qId); @@ -522,7 +520,7 @@ void qQueryMgmtNotifyClosed(void* pQMgmt) { pQueryMgmt->closed = true; pthread_mutex_unlock(&pQueryMgmt->lock); - taosCacheRefresh(pQueryMgmt->qinfoPool, queryMgmtKillQueryFn); + taosCacheRefresh(pQueryMgmt->qinfoPool, queryMgmtKillQueryFn, NULL); } void qQueryMgmtReOpen(void *pQMgmt) { @@ -641,95 +639,124 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo } } + qReleaseQInfo(pMgmt, (void **)&handle, true); return error; } // local struct typedef struct { int64_t qId; - int32_t timeMs; + int64_t startExecTs; + int64_t commitedMs; } SLongQuery; -// compare -int compareLongQuery(const void* p1, const void* p2) { +// callbark for sort compare +static int compareLongQuery(const void* p1, const void* p2) { // sort desc SLongQuery* plq1 = (SLongQuery*)p1; SLongQuery* plq2 = (SLongQuery*)p2; - if(plq1->timeMs == plq2->timeMs) { + if(plq1->startExecTs == plq2->startExecTs) { return 0; - } else if(plq1->timeMs > plq2->timeMs) { + } else if(plq1->startExecTs > plq2->startExecTs) { return -1; } else { return 1; } } +// callback for taosCacheRefresh +static void cbFoundItem(void* handle, void* param1) { + SQInfo * qInfo = *(SQInfo**) handle; + if(qInfo == NULL) return ; + SArray* qids = (SArray*) param1; + if(qids == NULL) return ; + + bool usedMem = true; + bool usedIMem = true; + SMemTable* mem = qInfo->query.memRef.snapshot.omem; + SMemTable* imem = qInfo->query.memRef.snapshot.imem; + if(mem == NULL || T_REF_VAL_GET(mem) == 0) + usedMem = false; + if(imem == NULL || T_REF_VAL_GET(mem) == 0) + usedIMem = false ; + + if(!usedMem && !usedIMem) + return ; + + // push to qids + SLongQuery* plq = (SLongQuery*)malloc(sizeof(SLongQuery)); + plq->qId = qInfo->qId; + plq->startExecTs = qInfo->startExecTs; + + // commitedMs + if(imem) { + plq->commitedMs = imem->commitedMs; + } else { + plq->commitedMs = 0; + } + + taosArrayPush(qids, &plq); +} + // longquery -void* qObtainLongQuery(void* param, int32_t longMs){ +void* qObtainLongQuery(void* param){ SQueryMgmt* qMgmt = (SQueryMgmt*)param; - if(qMgmt == NULL || qMgmt->qinfoPool == NULL) return NULL; + if(qMgmt == NULL || qMgmt->qinfoPool == NULL) + return NULL; SArray* qids = taosArrayInit(4, sizeof(int64_t*)); + if(qids == NULL) return NULL; + // Get each item + taosCacheRefresh(qMgmt->qinfoPool, cbFoundItem, qids); - SHashObj* pHashTable = qMgmt->qinfoPool->pHashTable; - if(pHashTable == NULL || pHashTable->hashList == NULL) return NULL; - - SQInfo * qInfo = (SQInfo*)taosHashIterate(pHashTable, NULL); - while(qInfo){ - // judge long query - SMemTable* imem = qInfo->runtimeEnv.pQueryAttr->memRef.snapshot.imem; - if(imem == NULL || imem->commitedMs == 0) continue; - int64_t now = taosGetTimestampMs(); - if(imem->commitedMs > now) continue; // weird, so skip - - int32_t passMs = now - imem->commitedMs; - if(passMs < longMs) { - continue; - } - - // push - SLongQuery* plq = (SLongQuery*)malloc(sizeof(SLongQuery)); - plq->timeMs = passMs; - plq->qId = qInfo->qId; - taosArrayPush(qids, plq); - - // next - qInfo = (SQInfo*)taosHashIterate(pHashTable, qInfo); - } - size_t cnt = taosArrayGetSize(qids); if(cnt == 0) { - taosArrayDestroyEx(qids, free); + taosArrayDestroy(qids); return NULL; } - if(cnt > 1) { + if(cnt > 1) taosArraySort(qids, compareLongQuery); - } return qids; } //solve tsdb no block to commit -bool qSolveCommitNoBlock(void* pRepo, void* pMgmt) { +bool qFixedNoBlock(void* pRepo, void* pMgmt, int32_t longQueryMs) { SQueryMgmt *pQueryMgmt = pMgmt; - int32_t longMs = 2000; // TODO config to taos.cfg + bool fixed = false; // qid top list - SArray *qids = (SArray*)qObtainLongQuery(pQueryMgmt, longMs); + SArray *qids = (SArray*)qObtainLongQuery(pQueryMgmt); if(qids == NULL) return false; // kill Query + int64_t now = taosGetTimestampMs(); size_t cnt = taosArrayGetSize(qids); + size_t i; SLongQuery* plq; - for(size_t i=0; i < cnt; i++) { + for(i=0; i < cnt; i++) { plq = (SLongQuery* )taosArrayGetP(qids, i); - qKillQueryByQId(pMgmt, plq->qId, 100, 50); // wait 50*100 ms - - // check break condition - if(tsdbIdleMemEnough() && tsdbAllowNewBlock(pRepo)) { - break; + if(plq->startExecTs > now) continue; + if(now - plq->startExecTs >= longQueryMs) { + qKillQueryByQId(pMgmt, plq->qId, 100, 30); // wait 50*100 ms + if(tsdbNoProblem(pRepo)) { + fixed = true; + break; + } } } + // free qids - taosArrayDestroyEx(qids, free); - return true; + for(i=0; i < cnt; i++) { + free(taosArrayGetP(qids, i)); + } + taosArrayDestroy(qids); + return fixed; +} + +//solve tsdb no block to commit +bool qSolveCommitNoBlock(void* pRepo, void* pMgmt) { + if(qFixedNoBlock(pRepo, pMgmt, 20*1000)) { + return true; + } + return qFixedNoBlock(pRepo, pMgmt, 5*1000); } \ No newline at end of file diff --git a/src/tsdb/inc/tsdbHealth.h b/src/tsdb/inc/tsdbHealth.h index e70c26f939..324f4312e0 100644 --- a/src/tsdb/inc/tsdbHealth.h +++ b/src/tsdb/inc/tsdbHealth.h @@ -19,4 +19,7 @@ bool tsdbUrgeQueryFree(STsdbRepo* pRepo); int32_t tsdbInsertNewBlock(STsdbRepo* pRepo); +bool tsdbIdleMemEnough(); +bool tsdbAllowNewBlock(STsdbRepo* pRepo); + #endif /* _TD_TSDB_BUFFER_H_ */ diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 532907ae01..3bbc4bd111 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -97,6 +97,7 @@ struct STsdbRepo { SMergeBuf mergeBuf; //used when update=2 int8_t compactState; // compact state: inCompact/noCompact/waitingCompact? + void* tmrCtrl; }; #define REPO_ID(r) (r)->config.tsdbId diff --git a/src/tsdb/src/tsdbBuffer.c b/src/tsdb/src/tsdbBuffer.c index 06a98323ab..af75fc45fd 100644 --- a/src/tsdb/src/tsdbBuffer.c +++ b/src/tsdb/src/tsdbBuffer.c @@ -64,6 +64,10 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) { ASSERT(pPool != NULL); + // debug test + pCfg->cacheBlockSize = 1; + pCfg->totalBlocks = 4; + pPool->bufBlockSize = pCfg->cacheBlockSize * 1024 * 1024; // MB pPool->tBufBlocks = pCfg->totalBlocks; pPool->nBufBlocks = 0; @@ -119,16 +123,22 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { STsdbBufPool *pBufPool = pRepo->pPool; while (POOL_IS_EMPTY(pBufPool)) { + tsdbWarn("vgId:%d Pool empty,nBufBlocks=%d nElastic=%d nRecycle=%d", REPO_ID(pRepo), pBufPool->nBufBlocks, pBufPool->nElasticBlocks, pBufPool->nRecycleBlocks); // supply new Block if(tsdbInsertNewBlock(pRepo) > 0) { + tsdbWarn("vgId:%d Insert new block to solve.", REPO_ID(pRepo)); break; } else { // no newBlock, kill query free - tsdbUrgeQueryFree(pRepo); + if(!tsdbUrgeQueryFree(pRepo)) { + tsdbWarn("vgId:%d Urge query free thread start failed.", REPO_ID(pRepo)); + } } pRepo->repoLocked = false; + tsdbDebug("vgId:%d wait for new block...", REPO_ID(pRepo)); pthread_cond_wait(&(pBufPool->poolNotEmpty), &(pRepo->mutex)); + tsdbDebug("vgId:%d waited new block ok.", REPO_ID(pRepo)); pRepo->repoLocked = true; } diff --git a/src/tsdb/src/tsdbHealth.c b/src/tsdb/src/tsdbHealth.c index 32f6f4fdb8..b590df28fe 100644 --- a/src/tsdb/src/tsdbHealth.c +++ b/src/tsdb/src/tsdbHealth.c @@ -18,12 +18,14 @@ #include "tarray.h" #include "query.h" #include "tglobal.h" +#include "tlist.h" #include "tsdbint.h" #include "tsdbBuffer.h" #include "tsdbLog.h" #include "tsdbHealth.h" #include "ttimer.h" + // return malloc new block count int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) { STsdbBufPool *pPool = pRepo->pPool; @@ -56,24 +58,28 @@ void cbKillQueryFree(void* param1, void* param2) { // return true do free , false do nothing bool tsdbUrgeQueryFree(STsdbRepo * pRepo) { // 1 start timer - tmr_h hTimer = taosTmrStart(cbKillQueryFree, 1, pRepo, NULL); + if(pRepo->tmrCtrl == NULL){ + pRepo->tmrCtrl = taosTmrInit(0, 0, 0, "REPO"); + } + + tmr_h hTimer = taosTmrStart(cbKillQueryFree, 1, pRepo, pRepo->tmrCtrl); return hTimer != NULL; } bool tsdbIdleMemEnough() { // TODO config to taos.cfg - int32_t lowestRate = 20; // below 20% idle memory, return not enough memory + int32_t lowestRate = 10; // below 10% idle memory, return not enough memory float memoryUsedMB = 0; float memoryAvailMB; - if (true != taosGetSysMemory(&memoryUsedMB)) { + if (!taosGetSysMemory(&memoryUsedMB)) { tsdbWarn("tsdbHealth get memory error, return false."); - return false; + return true; } if(memoryUsedMB > tsTotalMemoryMB || tsTotalMemoryMB == 0) { tsdbWarn("tsdbHealth used memory(%d MB) large total memory(%d MB), return false.", (int)memoryUsedMB, (int)tsTotalMemoryMB); - return false; + return true; } memoryAvailMB = (float)tsTotalMemoryMB - memoryUsedMB; @@ -88,7 +94,7 @@ bool tsdbIdleMemEnough() { bool tsdbAllowNewBlock(STsdbRepo* pRepo) { //TODO config to taos.cfg - int32_t nMaxElastic = 3; + int32_t nMaxElastic = 0; STsdbBufPool* pPool = pRepo->pPool; if(pPool->nElasticBlocks >= nMaxElastic) { tsdbWarn("tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", pPool->nElasticBlocks, nMaxElastic); @@ -96,3 +102,13 @@ bool tsdbAllowNewBlock(STsdbRepo* pRepo) { } return true; } + +bool tsdbNoProblem(STsdbRepo* pRepo) { + if(!tsdbIdleMemEnough()) + return false; + + if(listNEles(pRepo->pPool->bufBlockList)) + return false; + + return true; +} \ No newline at end of file diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index b2e6fe8916..099e369de9 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -16,6 +16,7 @@ // no test file errors here #include "taosdef.h" #include "tsdbint.h" +#include "ttimer.h" #define IS_VALID_PRECISION(precision) \ (((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO)) @@ -126,6 +127,10 @@ int tsdbCloseRepo(STsdbRepo *repo, int toCommit) { terrno = TSDB_CODE_SUCCESS; tsdbStopStream(pRepo); + if(pRepo->tmrCtrl){ + taosTmrCleanUp(pRepo->tmrCtrl); + pRepo->tmrCtrl = NULL; + } if (toCommit) { tsdbSyncCommit(repo); @@ -547,6 +552,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { pRepo->appH = *pAppH; } pRepo->repoLocked = false; + pRepo->tmrCtrl = NULL; int code = pthread_mutex_init(&(pRepo->mutex), NULL); if (code != 0) { diff --git a/src/util/inc/tcache.h b/src/util/inc/tcache.h index e41b544d00..0e0d1759a3 100644 --- a/src/util/inc/tcache.h +++ b/src/util/inc/tcache.h @@ -32,7 +32,7 @@ extern "C" { #define TSDB_CACHE_PTR_TYPE int64_t #endif -typedef void (*__cache_free_fn_t)(void*); +typedef void (*__cache_free_fn_t)(void*, void*); typedef struct SCacheStatis { int64_t missCount; @@ -176,7 +176,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj); * @param fp * @return */ -void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp); +void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp, void* param1); /** * stop background refresh worker thread diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 69b3741e13..526b3df171 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -140,7 +140,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize); if (pCacheObj->freeFp) { - pCacheObj->freeFp(pNode->data); + pCacheObj->freeFp(pNode->data, NULL); } free(pNode); @@ -174,7 +174,7 @@ static FORCE_INLINE STrashElem* doRemoveElemInTrashcan(SCacheObj* pCacheObj, STr static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem *pElem) { if (pCacheObj->freeFp) { - pCacheObj->freeFp(pElem->pData->data); + pCacheObj->freeFp(pElem->pData->data, NULL); } free(pElem->pData); @@ -249,7 +249,7 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v if (ret == 0) { if (T_REF_VAL_GET(p) == 0) { if (pCacheObj->freeFp) { - pCacheObj->freeFp(p->data); + pCacheObj->freeFp(p->data, NULL); } atomic_sub_fetch_64(&pCacheObj->totalSize, p->size); @@ -458,7 +458,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { pCacheObj->name, pNode->key, pNode->data, pNode->size, size, pCacheObj->totalSize); if (pCacheObj->freeFp) { - pCacheObj->freeFp(pNode->data); + pCacheObj->freeFp(pNode->data, NULL); } free(pNode); @@ -504,6 +504,7 @@ typedef struct SHashTravSupp { SCacheObj* pCacheObj; int64_t time; __cache_free_fn_t fp; + void* param1; } SHashTravSupp; static bool travHashTableEmptyFn(void* param, void* data) { @@ -662,17 +663,17 @@ bool travHashTableFn(void* param, void* data) { } if (ps->fp) { - (ps->fp)(pNode->data); + (ps->fp)(pNode->data, ps->param1); } // do not remove element in hash table return true; } -static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t fp) { +static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t fp, void* param1) { assert(pCacheObj != NULL); - SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time}; + SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time, .param1 = param1}; taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup); } @@ -736,7 +737,7 @@ void* taosCacheTimedRefresh(void *handle) { // refresh data in hash table if (elemInHash > 0) { int64_t now = taosGetTimestampMs(); - doCacheRefresh(pCacheObj, now, NULL); + doCacheRefresh(pCacheObj, now, NULL, NULL); } taosTrashcanEmpty(pCacheObj, false); @@ -753,13 +754,13 @@ void* taosCacheTimedRefresh(void *handle) { return NULL; } -void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp) { +void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp, void* param1) { if (pCacheObj == NULL) { return; } int64_t now = taosGetTimestampMs(); - doCacheRefresh(pCacheObj, now, fp); + doCacheRefresh(pCacheObj, now, fp, param1); } void taosStopCacheRefreshWorker() { -- GitLab