提交 951ff007 编写于 作者: A AlexDuan

add switch and elastic block equal totalBlocks 1/3

上级 22584e17
...@@ -221,6 +221,8 @@ extern uint32_t maxRange; ...@@ -221,6 +221,8 @@ extern uint32_t maxRange;
extern uint32_t curRange; extern uint32_t curRange;
extern char Compressor[]; extern char Compressor[];
#endif #endif
// long query
extern int8_t tsDeathLockKillQuery;
typedef struct { typedef struct {
char dir[TSDB_FILENAME_LEN]; char dir[TSDB_FILENAME_LEN];
......
...@@ -276,6 +276,9 @@ uint32_t curRange = 100; // range ...@@ -276,6 +276,9 @@ uint32_t curRange = 100; // range
char Compressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR char Compressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR
#endif #endif
// long query death-lock
int8_t tsDeathLockKillQuery = 0;
int32_t (*monStartSystemFp)() = NULL; int32_t (*monStartSystemFp)() = NULL;
void (*monStopSystemFp)() = NULL; void (*monStopSystemFp)() = NULL;
void (*monExecuteSQLFp)(char *sql) = NULL; void (*monExecuteSQLFp)(char *sql) = NULL;
...@@ -1647,6 +1650,16 @@ static void doInitGlobalConfig(void) { ...@@ -1647,6 +1650,16 @@ static void doInitGlobalConfig(void) {
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
#endif #endif
// enable kill long query
cfg.option = "deathLockKillQuery";
cfg.ptr = &tsDeathLockKillQuery;
cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 1;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
} }
void taosInitGlobalCfg() { void taosInitGlobalCfg() {
......
...@@ -123,22 +123,20 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { ...@@ -123,22 +123,20 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
STsdbBufPool *pBufPool = pRepo->pPool; STsdbBufPool *pBufPool = pRepo->pPool;
while (POOL_IS_EMPTY(pBufPool)) { while (POOL_IS_EMPTY(pBufPool)) {
tsdbWarn("vgId:%d Pool empty,nBufBlocks=%d nElastic=%d nRecycle=%d", REPO_ID(pRepo), pBufPool->nBufBlocks, pBufPool->nElasticBlocks, pBufPool->nRecycleBlocks); if(tsDeathLockKillQuery) {
// supply new Block // supply new Block
if(tsdbInsertNewBlock(pRepo) > 0) { if(tsdbInsertNewBlock(pRepo) > 0) {
tsdbWarn("vgId:%d Insert new block to solve.", REPO_ID(pRepo)); tsdbWarn("vgId:%d Insert elastic new block to solve.", REPO_ID(pRepo));
break; break;
} else { } else {
// no newBlock, kill query free // no newBlock, kill query free
if(!tsdbUrgeQueryFree(pRepo)) { if(!tsdbUrgeQueryFree(pRepo))
tsdbWarn("vgId:%d Urge query free thread start failed.", REPO_ID(pRepo)); tsdbWarn("vgId:%d Urge query free thread start failed.", REPO_ID(pRepo));
} }
} }
pRepo->repoLocked = false; pRepo->repoLocked = false;
tsdbDebug("vgId:%d wait for new block...", REPO_ID(pRepo));
pthread_cond_wait(&(pBufPool->poolNotEmpty), &(pRepo->mutex)); pthread_cond_wait(&(pBufPool->poolNotEmpty), &(pRepo->mutex));
tsdbDebug("vgId:%d waited new block ok.", REPO_ID(pRepo));
pRepo->repoLocked = true; pRepo->repoLocked = true;
} }
...@@ -160,7 +158,7 @@ STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) { ...@@ -160,7 +158,7 @@ STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) {
STsdbBufBlock *pBufBlock = (STsdbBufBlock *)malloc(sizeof(*pBufBlock) + bufBlockSize); STsdbBufBlock *pBufBlock = (STsdbBufBlock *)malloc(sizeof(*pBufBlock) + bufBlockSize);
if (pBufBlock == NULL) { if (pBufBlock == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err; return NULL;
} }
pBufBlock->blockId = 0; pBufBlock->blockId = 0;
...@@ -168,10 +166,6 @@ STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) { ...@@ -168,10 +166,6 @@ STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) {
pBufBlock->remain = bufBlockSize; pBufBlock->remain = bufBlockSize;
return pBufBlock; return pBufBlock;
_err:
tsdbFreeBufBlock(pBufBlock);
return NULL;
} }
void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); } void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); }
...@@ -216,10 +210,7 @@ void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic ...@@ -216,10 +210,7 @@ void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic
tsdbFreeBufBlock(pBufBlock); tsdbFreeBufBlock(pBufBlock);
free(pNode); free(pNode);
if(bELastic) if(bELastic)
{
pPool->nElasticBlocks--; pPool->nElasticBlocks--;
printf(" elastic block reduce one ok. current blocks=%d \n", pPool->nElasticBlocks);
}
else else
pPool->nBufBlocks--; pPool->nBufBlocks--;
} }
\ No newline at end of file
...@@ -31,7 +31,7 @@ int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) { ...@@ -31,7 +31,7 @@ int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) {
STsdbBufPool *pPool = pRepo->pPool; STsdbBufPool *pPool = pRepo->pPool;
int32_t cnt = 0; int32_t cnt = 0;
if(tsdbIdleMemEnough() && tsdbAllowNewBlock(pRepo)) { if(tsdbAllowNewBlock(pRepo)) {
STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize); STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
if (pBufBlock) { if (pBufBlock) {
if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) { if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) {
...@@ -67,37 +67,9 @@ bool tsdbUrgeQueryFree(STsdbRepo * pRepo) { ...@@ -67,37 +67,9 @@ bool tsdbUrgeQueryFree(STsdbRepo * pRepo) {
return hTimer != NULL; return hTimer != NULL;
} }
bool tsdbIdleMemEnough() {
// TODO config to taos.cfg
int32_t lowestRate = 5; // below 10% idle memory, return not enough memory
float memoryUsedMB = 0;
float memoryAvailMB;
if (!taosGetSysMemory(&memoryUsedMB)) {
tsdbWarn("tsdbHealth get memory error, return false.");
return true;
}
if(memoryUsedMB > tsTotalMemoryMB || tsTotalMemoryMB == 0) {
tsdbWarn("tsdbHealth used memory(%d MB) large total memory(%d MB), return false.", (int)memoryUsedMB, (int)tsTotalMemoryMB);
return true;
}
memoryAvailMB = (float)tsTotalMemoryMB - memoryUsedMB;
int32_t rate = (int32_t)(memoryAvailMB/tsTotalMemoryMB * 100);
if(rate < lowestRate){
tsdbWarn("tsdbHealth real rate :%d less than lowest rate:%d, so return false.", rate, lowestRate);
return false;
}
return true;
}
bool tsdbAllowNewBlock(STsdbRepo* pRepo) { bool tsdbAllowNewBlock(STsdbRepo* pRepo) {
//TODO config to taos.cfg int32_t nMaxElastic = pRepo->config.totalBlocks/3;
int32_t nMaxElastic = 1;
STsdbBufPool* pPool = pRepo->pPool; STsdbBufPool* pPool = pRepo->pPool;
printf("tsdbAllowNewBlock nElasticBlock(%d) MaxElasticBlocks(%d)\n", pPool->nElasticBlocks, nMaxElastic);
if(pPool->nElasticBlocks >= nMaxElastic) { if(pPool->nElasticBlocks >= nMaxElastic) {
tsdbWarn("tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", pPool->nElasticBlocks, nMaxElastic); tsdbWarn("tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", pPool->nElasticBlocks, nMaxElastic);
return false; return false;
...@@ -106,8 +78,6 @@ bool tsdbAllowNewBlock(STsdbRepo* pRepo) { ...@@ -106,8 +78,6 @@ bool tsdbAllowNewBlock(STsdbRepo* pRepo) {
} }
bool tsdbNoProblem(STsdbRepo* pRepo) { bool tsdbNoProblem(STsdbRepo* pRepo) {
if(!tsdbIdleMemEnough())
return false;
if(listNEles(pRepo->pPool->bufBlockList) == 0) if(listNEles(pRepo->pPool->bufBlockList) == 0)
return false; return false;
return true; return true;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册