提交 79a18b24 编写于 作者: A AlexDuan

tsdb call vnode method change to callback notify

上级 f0ce38e7
......@@ -79,6 +79,8 @@ int32_t qKillQuery(qinfo_t qinfo);
//kill by qid
int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount);
bool qSolveCommitNoBlock(void* pRepo, void* pMgmt);
int32_t qQueryCompleted(qinfo_t qinfo);
/**
......@@ -97,15 +99,6 @@ void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle);
bool checkQIdEqual(void *qHandle, uint64_t qId);
int64_t genQueryId(void);
// util
typedef struct {
int64_t qId;
int32_t timeMs;
} SLongQuery;
// return SArray* include SLongQuery*
void* qObtainLongQuery(void* qMgmt, int32_t longMs);
#ifdef __cplusplus
}
#endif
......
......@@ -39,6 +39,7 @@ extern "C" {
#define TSDB_STATUS_COMMIT_START 1
#define TSDB_STATUS_COMMIT_OVER 2
#define TSDB_STATUS_COMMIT_NOBLOCK 3 //commit no block, need to be solved
// TSDB STATE DEFINITION
#define TSDB_STATE_OK 0x0
......@@ -414,6 +415,10 @@ int tsdbSyncRecv(void *pRepo, SOCKET socketFd);
// For TSDB Compact
int tsdbCompact(STsdbRepo *pRepo);
// For TSDB Health Monitor
bool tsdbAllowNewBlock(STsdbRepo* pRepo);
bool tsdbIdleMemEnough();
#ifdef __cplusplus
}
#endif
......
......@@ -628,8 +628,7 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qDebug("QInfo:0x%"PRIx64" query killed by qid.", pQInfo->qId);
qWarn("QId:0x%"PRIx64" query killed becase no memory commit.", pQInfo->qId);
setQueryKilled(pQInfo);
// wait query stop
......@@ -645,6 +644,13 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo
return error;
}
// local struct
typedef struct {
int64_t qId;
int32_t timeMs;
} SLongQuery;
// compare
int compareLongQuery(const void* p1, const void* p2) {
// sort desc
SLongQuery* plq1 = (SLongQuery*)p1;
......@@ -658,7 +664,7 @@ int compareLongQuery(const void* p1, const void* p2) {
}
}
// util
// longquery
void* qObtainLongQuery(void* param, int32_t longMs){
SQueryMgmt* qMgmt = (SQueryMgmt*)param;
if(qMgmt == NULL || qMgmt->qinfoPool == NULL) return NULL;
......@@ -700,4 +706,30 @@ void* qObtainLongQuery(void* param, int32_t longMs){
}
return qids;
}
//solve tsdb no block to commit
bool qSolveCommitNoBlock(void* pRepo, void* pMgmt) {
SQueryMgmt *pQueryMgmt = pMgmt;
int32_t longMs = 2000; // TODO config to taos.cfg
// qid top list
SArray *qids = (SArray*)qObtainLongQuery(pQueryMgmt, longMs);
if(qids == NULL) return false;
// kill Query
size_t cnt = taosArrayGetSize(qids);
SLongQuery* plq;
for(size_t i=0; i < cnt; i++) {
plq = (SLongQuery* )taosArrayGetP(qids, i);
qKillQueryByQId(pMgmt, plq->qId, 100, 50); // wait 50*100 ms
// check break condition
if(tsdbIdleMemEnough() && tsdbAllowNewBlock(pRepo)) {
break;
}
}
// free qids
taosArrayDestroyEx(qids, free);
return true;
}
\ No newline at end of file
......@@ -19,7 +19,4 @@
bool tsdbUrgeQueryFree(STsdbRepo* pRepo);
int32_t tsdbInsertNewBlock(STsdbRepo* pRepo);
bool enoughIdleMemory();
bool allowNewBlock(STsdbRepo* pRepo);
#endif /* _TD_TSDB_BUFFER_H_ */
......@@ -24,19 +24,12 @@
#include "tsdbHealth.h"
#include "ttimer.h"
#include "../../vnode/inc/vnodeInt.h"
// get qmgmt
void* vnodeGetqMgmt(void* pVnode){
if(pVnode == NULL) return NULL;
return ((SVnodeObj*)pVnode)->qMgmt;
}
// return malloc new block count
int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) {
STsdbBufPool *pPool = pRepo->pPool;
int32_t cnt = 0;
if(enoughIdleMemory() && allowNewBlock(pRepo)) {
if(tsdbIdleMemEnough() && tsdbAllowNewBlock(pRepo)) {
STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
if (pBufBlock) {
if (tsdbLockRepo(pRepo) >= 0) {
......@@ -57,36 +50,10 @@ int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) {
// switch anther thread to run
void cbKillQueryFree(void* param1, void* param2) {
STsdbRepo* pRepo = (STsdbRepo*)param1;
int32_t longMs = 2000; // TODO config to taos.cfg
// vnode
void* vnodeObj = pRepo->appH.appH;
if(vnodeObj == NULL) return ;
// qMgmt
void* qMgmt = vnodeGetqMgmt(vnodeObj);
if(qMgmt == NULL) return ;
// qid top list
SArray *qids = (SArray*)qObtainLongQuery(qMgmt, longMs);
if(qids == NULL) return ;
// kill Query
size_t cnt = taosArrayGetSize(qids);
int64_t qId = 0;
for(size_t i=0; i < cnt; i++) {
qId = *(int64_t*)taosArrayGetP(qids, i);
qKillQueryByQId(qMgmt, qId, 100, 50); // wait 50*100 ms
// notify wait
pthread_cond_signal(&pRepo->pPool->poolNotEmpty);
// check break condition
if(enoughIdleMemory() && allowNewBlock(pRepo)) {
break;
}
if(pRepo->appH.notifyStatus) {
pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_NOBLOCK, TSDB_CODE_SUCCESS);
}
// free qids
taosArrayDestroyEx(qids, free);
}
// return true do free , false do nothing
......@@ -96,7 +63,7 @@ bool tsdbUrgeQueryFree(STsdbRepo * pRepo) {
return hTimer != NULL;
}
bool enoughIdleMemory(){
bool tsdbIdleMemEnough() {
// TODO config to taos.cfg
int32_t lowestRate = 20; // below 20% idle memory, return not enough memory
float memoryUsedMB = 0;
......@@ -122,7 +89,7 @@ bool enoughIdleMemory(){
return true;
}
bool allowNewBlock(STsdbRepo* pRepo){
bool tsdbAllowNewBlock(STsdbRepo* pRepo) {
//TODO config to taos.cfg
int32_t nElasticBlocks = 10;
STsdbBufPool* pPool = pRepo->pPool;
......
......@@ -560,5 +560,10 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
return vnodeSaveVersion(pVnode);
}
// timer thread callback
if(status == TSDB_STATUS_COMMIT_NOBLOCK) {
qSolveCommitNoBlock(pVnode->tsdb, pVnode->qMgmt);
}
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册