提交 3d79ac34 编写于 作者: A AlexDuan

qRequire ref add

上级 0608e908
...@@ -59,7 +59,7 @@ void tscCheckDiskUsage(void *UNUSED_PARAM(para), void *UNUSED_PARAM(param)) { ...@@ -59,7 +59,7 @@ void tscCheckDiskUsage(void *UNUSED_PARAM(para), void *UNUSED_PARAM(param)) {
taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr); taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
} }
void tscFreeRpcObj(void *param) { void tscFreeRpcObj(void *param, void* param1) {
assert(param); assert(param);
SRpcObj *pRpcObj = (SRpcObj *)(param); SRpcObj *pRpcObj = (SRpcObj *)(param);
tscDebug("free rpcObj:%p and free pDnodeConn: %p", pRpcObj, pRpcObj->pDnodeConn); tscDebug("free rpcObj:%p and free pDnodeConn: %p", pRpcObj, pRpcObj->pDnodeConn);
......
...@@ -416,8 +416,9 @@ int tsdbSyncRecv(void *pRepo, SOCKET socketFd); ...@@ -416,8 +416,9 @@ int tsdbSyncRecv(void *pRepo, SOCKET socketFd);
int tsdbCompact(STsdbRepo *pRepo); int tsdbCompact(STsdbRepo *pRepo);
// For TSDB Health Monitor // For TSDB Health Monitor
bool tsdbAllowNewBlock(STsdbRepo* pRepo);
bool tsdbIdleMemEnough(); // no problem return true
bool tsdbNoProblem(STsdbRepo* pRepo);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -46,7 +46,7 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi ...@@ -46,7 +46,7 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi
static void mnodeCancelGetNextConn(void *pIter); static void mnodeCancelGetNextConn(void *pIter);
static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mnodeFreeConn(void *data); static void mnodeFreeConn(void *data, void* param1);
static int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg);
...@@ -135,7 +135,7 @@ SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t po ...@@ -135,7 +135,7 @@ SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t po
return pConn; return pConn;
} }
static void mnodeFreeConn(void *data) { static void mnodeFreeConn(void *data, void* param1) {
SConnObj *pConn = data; SConnObj *pConn = data;
tfree(pConn->pQueries); tfree(pConn->pQueries);
tfree(pConn->pStreams); tfree(pConn->pStreams);
......
...@@ -46,7 +46,7 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *mnodeMsg); ...@@ -46,7 +46,7 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *mnodeMsg);
static int32_t mnodeProcessConnectMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessConnectMsg(SMnodeMsg *mnodeMsg);
static int32_t mnodeProcessUseMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessUseMsg(SMnodeMsg *mnodeMsg);
static void mnodeFreeShowObj(void *data); static void mnodeFreeShowObj(void *data, void* param1);
static bool mnodeAccquireShowObj(SShowObj *pShow); static bool mnodeAccquireShowObj(SShowObj *pShow);
static bool mnodeCheckShowFinished(SShowObj *pShow); static bool mnodeCheckShowFinished(SShowObj *pShow);
static void *mnodePutShowObj(SShowObj *pShow); static void *mnodePutShowObj(SShowObj *pShow);
...@@ -420,7 +420,7 @@ static void* mnodePutShowObj(SShowObj *pShow) { ...@@ -420,7 +420,7 @@ static void* mnodePutShowObj(SShowObj *pShow) {
return NULL; return NULL;
} }
static void mnodeFreeShowObj(void *data) { static void mnodeFreeShowObj(void *data, void* param1) {
SShowObj *pShow = *(SShowObj **)data; SShowObj *pShow = *(SShowObj **)data;
if (tsMnodeShowFreeIterFp[pShow->type] != NULL) { if (tsMnodeShowFreeIterFp[pShow->type] != NULL) {
if (pShow->pVgIter != NULL) { if (pShow->pVgIter != NULL) {
......
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
#include "httpContext.h" #include "httpContext.h"
#include "httpParser.h" #include "httpParser.h"
static void httpDestroyContext(void *data); static void httpDestroyContext(void *data, void* param1);
static void httpRemoveContextFromEpoll(HttpContext *pContext) { static void httpRemoveContextFromEpoll(HttpContext *pContext) {
HttpThread *pThread = pContext->pThread; HttpThread *pThread = pContext->pThread;
...@@ -44,7 +44,7 @@ static void httpRemoveContextFromEpoll(HttpContext *pContext) { ...@@ -44,7 +44,7 @@ static void httpRemoveContextFromEpoll(HttpContext *pContext) {
} }
} }
static void httpDestroyContext(void *data) { static void httpDestroyContext(void *data, void* param1) {
HttpContext *pContext = *(HttpContext **)data; HttpContext *pContext = *(HttpContext **)data;
if (pContext->fd > 0) taosCloseSocket(pContext->fd); if (pContext->fd > 0) taosCloseSocket(pContext->fd);
......
...@@ -95,7 +95,7 @@ void httpReleaseSession(HttpContext *pContext) { ...@@ -95,7 +95,7 @@ void httpReleaseSession(HttpContext *pContext) {
pContext->session = NULL; pContext->session = NULL;
} }
static void httpDestroySession(void *data) { static void httpDestroySession(void *data, void* param1) {
HttpSession *session = data; HttpSession *session = data;
httpDebug("session:%p:%p, is destroyed, sessionRef:%d", session, session->taos, session->refCount); httpDebug("session:%p:%p, is destroyed, sessionRef:%d", session, session->taos, session->refCount);
......
...@@ -2387,7 +2387,7 @@ bool isQueryKilled(SQInfo *pQInfo) { ...@@ -2387,7 +2387,7 @@ bool isQueryKilled(SQInfo *pQInfo) {
(!needBuildResAfterQueryComplete(pQInfo))) { (!needBuildResAfterQueryComplete(pQInfo))) {
assert(pQInfo->startExecTs != 0); assert(pQInfo->startExecTs != 0);
qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d sec, abort current query execution, start:%" PRId64 qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64
", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec()); ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec());
return true; return true;
} }
......
...@@ -35,12 +35,12 @@ typedef struct SQueryMgmt { ...@@ -35,12 +35,12 @@ typedef struct SQueryMgmt {
bool closed; bool closed;
} SQueryMgmt; } SQueryMgmt;
static void queryMgmtKillQueryFn(void* handle) { static void queryMgmtKillQueryFn(void* handle, void* param1) {
void** fp = (void**)handle; void** fp = (void**)handle;
qKillQuery(*fp); qKillQuery(*fp);
} }
static void freeqinfoFn(void *qhandle) { static void freeqinfoFn(void *qhandle, void* param1) {
void** handle = qhandle; void** handle = qhandle;
if (handle == NULL || *handle == NULL) { if (handle == NULL || *handle == NULL) {
return; return;
...@@ -254,8 +254,6 @@ int waitMoment(SQInfo* pQInfo){ ...@@ -254,8 +254,6 @@ int waitMoment(SQInfo* pQInfo){
} }
} }
} }
taosMsleep(ms);
} }
return 1; return 1;
} }
...@@ -274,7 +272,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) { ...@@ -274,7 +272,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
} }
*qId = pQInfo->qId; *qId = pQInfo->qId;
pQInfo->startExecTs = taosGetTimestampSec(); pQInfo->startExecTs = taosGetTimestampMs();
if (isQueryKilled(pQInfo)) { if (isQueryKilled(pQInfo)) {
qDebug("QInfo:0x%"PRIx64" it is already killed, abort", pQInfo->qId); qDebug("QInfo:0x%"PRIx64" it is already killed, abort", pQInfo->qId);
...@@ -522,7 +520,7 @@ void qQueryMgmtNotifyClosed(void* pQMgmt) { ...@@ -522,7 +520,7 @@ void qQueryMgmtNotifyClosed(void* pQMgmt) {
pQueryMgmt->closed = true; pQueryMgmt->closed = true;
pthread_mutex_unlock(&pQueryMgmt->lock); pthread_mutex_unlock(&pQueryMgmt->lock);
taosCacheRefresh(pQueryMgmt->qinfoPool, queryMgmtKillQueryFn); taosCacheRefresh(pQueryMgmt->qinfoPool, queryMgmtKillQueryFn, NULL);
} }
void qQueryMgmtReOpen(void *pQMgmt) { void qQueryMgmtReOpen(void *pQMgmt) {
...@@ -641,95 +639,124 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo ...@@ -641,95 +639,124 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo
} }
} }
qReleaseQInfo(pMgmt, (void **)&handle, true);
return error; return error;
} }
// local struct // local struct
typedef struct { typedef struct {
int64_t qId; int64_t qId;
int32_t timeMs; int64_t startExecTs;
int64_t commitedMs;
} SLongQuery; } SLongQuery;
// compare // callbark for sort compare
int compareLongQuery(const void* p1, const void* p2) { static int compareLongQuery(const void* p1, const void* p2) {
// sort desc // sort desc
SLongQuery* plq1 = (SLongQuery*)p1; SLongQuery* plq1 = (SLongQuery*)p1;
SLongQuery* plq2 = (SLongQuery*)p2; SLongQuery* plq2 = (SLongQuery*)p2;
if(plq1->timeMs == plq2->timeMs) { if(plq1->startExecTs == plq2->startExecTs) {
return 0; return 0;
} else if(plq1->timeMs > plq2->timeMs) { } else if(plq1->startExecTs > plq2->startExecTs) {
return -1; return -1;
} else { } else {
return 1; return 1;
} }
} }
// callback for taosCacheRefresh
static void cbFoundItem(void* handle, void* param1) {
SQInfo * qInfo = *(SQInfo**) handle;
if(qInfo == NULL) return ;
SArray* qids = (SArray*) param1;
if(qids == NULL) return ;
bool usedMem = true;
bool usedIMem = true;
SMemTable* mem = qInfo->query.memRef.snapshot.omem;
SMemTable* imem = qInfo->query.memRef.snapshot.imem;
if(mem == NULL || T_REF_VAL_GET(mem) == 0)
usedMem = false;
if(imem == NULL || T_REF_VAL_GET(mem) == 0)
usedIMem = false ;
if(!usedMem && !usedIMem)
return ;
// push to qids
SLongQuery* plq = (SLongQuery*)malloc(sizeof(SLongQuery));
plq->qId = qInfo->qId;
plq->startExecTs = qInfo->startExecTs;
// commitedMs
if(imem) {
plq->commitedMs = imem->commitedMs;
} else {
plq->commitedMs = 0;
}
taosArrayPush(qids, &plq);
}
// longquery // longquery
void* qObtainLongQuery(void* param, int32_t longMs){ void* qObtainLongQuery(void* param){
SQueryMgmt* qMgmt = (SQueryMgmt*)param; SQueryMgmt* qMgmt = (SQueryMgmt*)param;
if(qMgmt == NULL || qMgmt->qinfoPool == NULL) return NULL; if(qMgmt == NULL || qMgmt->qinfoPool == NULL)
return NULL;
SArray* qids = taosArrayInit(4, sizeof(int64_t*)); SArray* qids = taosArrayInit(4, sizeof(int64_t*));
if(qids == NULL) return NULL;
// Get each item
taosCacheRefresh(qMgmt->qinfoPool, cbFoundItem, qids);
SHashObj* pHashTable = qMgmt->qinfoPool->pHashTable;
if(pHashTable == NULL || pHashTable->hashList == NULL) return NULL;
SQInfo * qInfo = (SQInfo*)taosHashIterate(pHashTable, NULL);
while(qInfo){
// judge long query
SMemTable* imem = qInfo->runtimeEnv.pQueryAttr->memRef.snapshot.imem;
if(imem == NULL || imem->commitedMs == 0) continue;
int64_t now = taosGetTimestampMs();
if(imem->commitedMs > now) continue; // weird, so skip
int32_t passMs = now - imem->commitedMs;
if(passMs < longMs) {
continue;
}
// push
SLongQuery* plq = (SLongQuery*)malloc(sizeof(SLongQuery));
plq->timeMs = passMs;
plq->qId = qInfo->qId;
taosArrayPush(qids, plq);
// next
qInfo = (SQInfo*)taosHashIterate(pHashTable, qInfo);
}
size_t cnt = taosArrayGetSize(qids); size_t cnt = taosArrayGetSize(qids);
if(cnt == 0) { if(cnt == 0) {
taosArrayDestroyEx(qids, free); taosArrayDestroy(qids);
return NULL; return NULL;
} }
if(cnt > 1) { if(cnt > 1)
taosArraySort(qids, compareLongQuery); taosArraySort(qids, compareLongQuery);
}
return qids; return qids;
} }
//solve tsdb no block to commit //solve tsdb no block to commit
bool qSolveCommitNoBlock(void* pRepo, void* pMgmt) { bool qFixedNoBlock(void* pRepo, void* pMgmt, int32_t longQueryMs) {
SQueryMgmt *pQueryMgmt = pMgmt; SQueryMgmt *pQueryMgmt = pMgmt;
int32_t longMs = 2000; // TODO config to taos.cfg bool fixed = false;
// qid top list // qid top list
SArray *qids = (SArray*)qObtainLongQuery(pQueryMgmt, longMs); SArray *qids = (SArray*)qObtainLongQuery(pQueryMgmt);
if(qids == NULL) return false; if(qids == NULL) return false;
// kill Query // kill Query
int64_t now = taosGetTimestampMs();
size_t cnt = taosArrayGetSize(qids); size_t cnt = taosArrayGetSize(qids);
size_t i;
SLongQuery* plq; SLongQuery* plq;
for(size_t i=0; i < cnt; i++) { for(i=0; i < cnt; i++) {
plq = (SLongQuery* )taosArrayGetP(qids, i); plq = (SLongQuery* )taosArrayGetP(qids, i);
qKillQueryByQId(pMgmt, plq->qId, 100, 50); // wait 50*100 ms if(plq->startExecTs > now) continue;
if(now - plq->startExecTs >= longQueryMs) {
// check break condition qKillQueryByQId(pMgmt, plq->qId, 100, 30); // wait 50*100 ms
if(tsdbIdleMemEnough() && tsdbAllowNewBlock(pRepo)) { if(tsdbNoProblem(pRepo)) {
break; fixed = true;
break;
}
} }
} }
// free qids // free qids
taosArrayDestroyEx(qids, free); for(i=0; i < cnt; i++) {
return true; free(taosArrayGetP(qids, i));
}
taosArrayDestroy(qids);
return fixed;
}
//solve tsdb no block to commit
bool qSolveCommitNoBlock(void* pRepo, void* pMgmt) {
if(qFixedNoBlock(pRepo, pMgmt, 20*1000)) {
return true;
}
return qFixedNoBlock(pRepo, pMgmt, 5*1000);
} }
\ No newline at end of file
...@@ -19,4 +19,7 @@ ...@@ -19,4 +19,7 @@
bool tsdbUrgeQueryFree(STsdbRepo* pRepo); bool tsdbUrgeQueryFree(STsdbRepo* pRepo);
int32_t tsdbInsertNewBlock(STsdbRepo* pRepo); int32_t tsdbInsertNewBlock(STsdbRepo* pRepo);
bool tsdbIdleMemEnough();
bool tsdbAllowNewBlock(STsdbRepo* pRepo);
#endif /* _TD_TSDB_BUFFER_H_ */ #endif /* _TD_TSDB_BUFFER_H_ */
...@@ -97,6 +97,7 @@ struct STsdbRepo { ...@@ -97,6 +97,7 @@ struct STsdbRepo {
SMergeBuf mergeBuf; //used when update=2 SMergeBuf mergeBuf; //used when update=2
int8_t compactState; // compact state: inCompact/noCompact/waitingCompact? int8_t compactState; // compact state: inCompact/noCompact/waitingCompact?
void* tmrCtrl;
}; };
#define REPO_ID(r) (r)->config.tsdbId #define REPO_ID(r) (r)->config.tsdbId
......
...@@ -64,6 +64,10 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) { ...@@ -64,6 +64,10 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) {
ASSERT(pPool != NULL); ASSERT(pPool != NULL);
// debug test
pCfg->cacheBlockSize = 1;
pCfg->totalBlocks = 4;
pPool->bufBlockSize = pCfg->cacheBlockSize * 1024 * 1024; // MB pPool->bufBlockSize = pCfg->cacheBlockSize * 1024 * 1024; // MB
pPool->tBufBlocks = pCfg->totalBlocks; pPool->tBufBlocks = pCfg->totalBlocks;
pPool->nBufBlocks = 0; pPool->nBufBlocks = 0;
...@@ -119,16 +123,22 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { ...@@ -119,16 +123,22 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
STsdbBufPool *pBufPool = pRepo->pPool; STsdbBufPool *pBufPool = pRepo->pPool;
while (POOL_IS_EMPTY(pBufPool)) { while (POOL_IS_EMPTY(pBufPool)) {
tsdbWarn("vgId:%d Pool empty,nBufBlocks=%d nElastic=%d nRecycle=%d", REPO_ID(pRepo), pBufPool->nBufBlocks, pBufPool->nElasticBlocks, pBufPool->nRecycleBlocks);
// supply new Block // supply new Block
if(tsdbInsertNewBlock(pRepo) > 0) { if(tsdbInsertNewBlock(pRepo) > 0) {
tsdbWarn("vgId:%d Insert new block to solve.", REPO_ID(pRepo));
break; break;
} else { } else {
// no newBlock, kill query free // no newBlock, kill query free
tsdbUrgeQueryFree(pRepo); if(!tsdbUrgeQueryFree(pRepo)) {
tsdbWarn("vgId:%d Urge query free thread start failed.", REPO_ID(pRepo));
}
} }
pRepo->repoLocked = false; pRepo->repoLocked = false;
tsdbDebug("vgId:%d wait for new block...", REPO_ID(pRepo));
pthread_cond_wait(&(pBufPool->poolNotEmpty), &(pRepo->mutex)); pthread_cond_wait(&(pBufPool->poolNotEmpty), &(pRepo->mutex));
tsdbDebug("vgId:%d waited new block ok.", REPO_ID(pRepo));
pRepo->repoLocked = true; pRepo->repoLocked = true;
} }
......
...@@ -18,12 +18,14 @@ ...@@ -18,12 +18,14 @@
#include "tarray.h" #include "tarray.h"
#include "query.h" #include "query.h"
#include "tglobal.h" #include "tglobal.h"
#include "tlist.h"
#include "tsdbint.h" #include "tsdbint.h"
#include "tsdbBuffer.h" #include "tsdbBuffer.h"
#include "tsdbLog.h" #include "tsdbLog.h"
#include "tsdbHealth.h" #include "tsdbHealth.h"
#include "ttimer.h" #include "ttimer.h"
// return malloc new block count // return malloc new block count
int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) { int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) {
STsdbBufPool *pPool = pRepo->pPool; STsdbBufPool *pPool = pRepo->pPool;
...@@ -56,24 +58,28 @@ void cbKillQueryFree(void* param1, void* param2) { ...@@ -56,24 +58,28 @@ void cbKillQueryFree(void* param1, void* param2) {
// return true do free , false do nothing // return true do free , false do nothing
bool tsdbUrgeQueryFree(STsdbRepo * pRepo) { bool tsdbUrgeQueryFree(STsdbRepo * pRepo) {
// 1 start timer // 1 start timer
tmr_h hTimer = taosTmrStart(cbKillQueryFree, 1, pRepo, NULL); if(pRepo->tmrCtrl == NULL){
pRepo->tmrCtrl = taosTmrInit(0, 0, 0, "REPO");
}
tmr_h hTimer = taosTmrStart(cbKillQueryFree, 1, pRepo, pRepo->tmrCtrl);
return hTimer != NULL; return hTimer != NULL;
} }
bool tsdbIdleMemEnough() { bool tsdbIdleMemEnough() {
// TODO config to taos.cfg // TODO config to taos.cfg
int32_t lowestRate = 20; // below 20% idle memory, return not enough memory int32_t lowestRate = 10; // below 10% idle memory, return not enough memory
float memoryUsedMB = 0; float memoryUsedMB = 0;
float memoryAvailMB; float memoryAvailMB;
if (true != taosGetSysMemory(&memoryUsedMB)) { if (!taosGetSysMemory(&memoryUsedMB)) {
tsdbWarn("tsdbHealth get memory error, return false."); tsdbWarn("tsdbHealth get memory error, return false.");
return false; return true;
} }
if(memoryUsedMB > tsTotalMemoryMB || tsTotalMemoryMB == 0) { if(memoryUsedMB > tsTotalMemoryMB || tsTotalMemoryMB == 0) {
tsdbWarn("tsdbHealth used memory(%d MB) large total memory(%d MB), return false.", (int)memoryUsedMB, (int)tsTotalMemoryMB); tsdbWarn("tsdbHealth used memory(%d MB) large total memory(%d MB), return false.", (int)memoryUsedMB, (int)tsTotalMemoryMB);
return false; return true;
} }
memoryAvailMB = (float)tsTotalMemoryMB - memoryUsedMB; memoryAvailMB = (float)tsTotalMemoryMB - memoryUsedMB;
...@@ -88,7 +94,7 @@ bool tsdbIdleMemEnough() { ...@@ -88,7 +94,7 @@ bool tsdbIdleMemEnough() {
bool tsdbAllowNewBlock(STsdbRepo* pRepo) { bool tsdbAllowNewBlock(STsdbRepo* pRepo) {
//TODO config to taos.cfg //TODO config to taos.cfg
int32_t nMaxElastic = 3; int32_t nMaxElastic = 0;
STsdbBufPool* pPool = pRepo->pPool; STsdbBufPool* pPool = pRepo->pPool;
if(pPool->nElasticBlocks >= nMaxElastic) { if(pPool->nElasticBlocks >= nMaxElastic) {
tsdbWarn("tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", pPool->nElasticBlocks, nMaxElastic); tsdbWarn("tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", pPool->nElasticBlocks, nMaxElastic);
...@@ -96,3 +102,13 @@ bool tsdbAllowNewBlock(STsdbRepo* pRepo) { ...@@ -96,3 +102,13 @@ bool tsdbAllowNewBlock(STsdbRepo* pRepo) {
} }
return true; return true;
} }
bool tsdbNoProblem(STsdbRepo* pRepo) {
if(!tsdbIdleMemEnough())
return false;
if(listNEles(pRepo->pPool->bufBlockList))
return false;
return true;
}
\ No newline at end of file
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
// no test file errors here // no test file errors here
#include "taosdef.h" #include "taosdef.h"
#include "tsdbint.h" #include "tsdbint.h"
#include "ttimer.h"
#define IS_VALID_PRECISION(precision) \ #define IS_VALID_PRECISION(precision) \
(((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO)) (((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO))
...@@ -126,6 +127,10 @@ int tsdbCloseRepo(STsdbRepo *repo, int toCommit) { ...@@ -126,6 +127,10 @@ int tsdbCloseRepo(STsdbRepo *repo, int toCommit) {
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
tsdbStopStream(pRepo); tsdbStopStream(pRepo);
if(pRepo->tmrCtrl){
taosTmrCleanUp(pRepo->tmrCtrl);
pRepo->tmrCtrl = NULL;
}
if (toCommit) { if (toCommit) {
tsdbSyncCommit(repo); tsdbSyncCommit(repo);
...@@ -547,6 +552,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { ...@@ -547,6 +552,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
pRepo->appH = *pAppH; pRepo->appH = *pAppH;
} }
pRepo->repoLocked = false; pRepo->repoLocked = false;
pRepo->tmrCtrl = NULL;
int code = pthread_mutex_init(&(pRepo->mutex), NULL); int code = pthread_mutex_init(&(pRepo->mutex), NULL);
if (code != 0) { if (code != 0) {
......
...@@ -32,7 +32,7 @@ extern "C" { ...@@ -32,7 +32,7 @@ extern "C" {
#define TSDB_CACHE_PTR_TYPE int64_t #define TSDB_CACHE_PTR_TYPE int64_t
#endif #endif
typedef void (*__cache_free_fn_t)(void*); typedef void (*__cache_free_fn_t)(void*, void*);
typedef struct SCacheStatis { typedef struct SCacheStatis {
int64_t missCount; int64_t missCount;
...@@ -176,7 +176,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj); ...@@ -176,7 +176,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj);
* @param fp * @param fp
* @return * @return
*/ */
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp); void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp, void* param1);
/** /**
* stop background refresh worker thread * stop background refresh worker thread
......
...@@ -140,7 +140,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo ...@@ -140,7 +140,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize); pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize);
if (pCacheObj->freeFp) { if (pCacheObj->freeFp) {
pCacheObj->freeFp(pNode->data); pCacheObj->freeFp(pNode->data, NULL);
} }
free(pNode); free(pNode);
...@@ -174,7 +174,7 @@ static FORCE_INLINE STrashElem* doRemoveElemInTrashcan(SCacheObj* pCacheObj, STr ...@@ -174,7 +174,7 @@ static FORCE_INLINE STrashElem* doRemoveElemInTrashcan(SCacheObj* pCacheObj, STr
static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem *pElem) { static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem *pElem) {
if (pCacheObj->freeFp) { if (pCacheObj->freeFp) {
pCacheObj->freeFp(pElem->pData->data); pCacheObj->freeFp(pElem->pData->data, NULL);
} }
free(pElem->pData); free(pElem->pData);
...@@ -249,7 +249,7 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v ...@@ -249,7 +249,7 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
if (ret == 0) { if (ret == 0) {
if (T_REF_VAL_GET(p) == 0) { if (T_REF_VAL_GET(p) == 0) {
if (pCacheObj->freeFp) { if (pCacheObj->freeFp) {
pCacheObj->freeFp(p->data); pCacheObj->freeFp(p->data, NULL);
} }
atomic_sub_fetch_64(&pCacheObj->totalSize, p->size); atomic_sub_fetch_64(&pCacheObj->totalSize, p->size);
...@@ -458,7 +458,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { ...@@ -458,7 +458,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
pCacheObj->name, pNode->key, pNode->data, pNode->size, size, pCacheObj->totalSize); pCacheObj->name, pNode->key, pNode->data, pNode->size, size, pCacheObj->totalSize);
if (pCacheObj->freeFp) { if (pCacheObj->freeFp) {
pCacheObj->freeFp(pNode->data); pCacheObj->freeFp(pNode->data, NULL);
} }
free(pNode); free(pNode);
...@@ -504,6 +504,7 @@ typedef struct SHashTravSupp { ...@@ -504,6 +504,7 @@ typedef struct SHashTravSupp {
SCacheObj* pCacheObj; SCacheObj* pCacheObj;
int64_t time; int64_t time;
__cache_free_fn_t fp; __cache_free_fn_t fp;
void* param1;
} SHashTravSupp; } SHashTravSupp;
static bool travHashTableEmptyFn(void* param, void* data) { static bool travHashTableEmptyFn(void* param, void* data) {
...@@ -662,17 +663,17 @@ bool travHashTableFn(void* param, void* data) { ...@@ -662,17 +663,17 @@ bool travHashTableFn(void* param, void* data) {
} }
if (ps->fp) { if (ps->fp) {
(ps->fp)(pNode->data); (ps->fp)(pNode->data, ps->param1);
} }
// do not remove element in hash table // do not remove element in hash table
return true; return true;
} }
static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t fp) { static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t fp, void* param1) {
assert(pCacheObj != NULL); assert(pCacheObj != NULL);
SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time}; SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time, .param1 = param1};
taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup); taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup);
} }
...@@ -736,7 +737,7 @@ void* taosCacheTimedRefresh(void *handle) { ...@@ -736,7 +737,7 @@ void* taosCacheTimedRefresh(void *handle) {
// refresh data in hash table // refresh data in hash table
if (elemInHash > 0) { if (elemInHash > 0) {
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
doCacheRefresh(pCacheObj, now, NULL); doCacheRefresh(pCacheObj, now, NULL, NULL);
} }
taosTrashcanEmpty(pCacheObj, false); taosTrashcanEmpty(pCacheObj, false);
...@@ -753,13 +754,13 @@ void* taosCacheTimedRefresh(void *handle) { ...@@ -753,13 +754,13 @@ void* taosCacheTimedRefresh(void *handle) {
return NULL; return NULL;
} }
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp) { void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp, void* param1) {
if (pCacheObj == NULL) { if (pCacheObj == NULL) {
return; return;
} }
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
doCacheRefresh(pCacheObj, now, fp); doCacheRefresh(pCacheObj, now, fp, param1);
} }
void taosStopCacheRefreshWorker() { void taosStopCacheRefreshWorker() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册