diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index b2219a53a7e8ad8e2f45d8aefacd13109e3d41de..089e30ac3728761c68fe155f960c8650a32c2f7a 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -193,7 +193,6 @@ typedef struct { SList * bufBlockList; int64_t pointsAdd; // TODO int64_t storageAdd; // TODO - int64_t commitedMs; // commited ms time , zero is no commit. } SMemTable; typedef struct { diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index a409d955add34d54f025fd98e63a5f4faf670770..7eeaac421aed111d371191d6717a5c8d41087f8f 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -249,7 +249,7 @@ int waitMoment(SQInfo* pQInfo){ taosMsleep(1000); used_ms += 1000; if(isQueryKilled(pQInfo)){ - printf(" check query is canceled, sleep break... \n"); + printf(" check query is canceled, sleep break... %s\n", pQInfo->sql); break; } } @@ -626,7 +626,7 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo if (pQInfo == NULL || !isValidQInfo(pQInfo)) { return TSDB_CODE_QRY_INVALID_QHANDLE; } - qWarn("QId:0x%"PRIx64" query killed becase no memory commit.", pQInfo->qId); + qWarn("QId:0x%"PRIx64" be killed(no memory commit).", pQInfo->qId); setQueryKilled(pQInfo); // wait query stop @@ -647,20 +647,19 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo typedef struct { int64_t qId; int64_t startExecTs; - int64_t commitedMs; } SLongQuery; // callbark for sort compare static int compareLongQuery(const void* p1, const void* p2) { // sort desc - SLongQuery* plq1 = (SLongQuery*)p1; - SLongQuery* plq2 = (SLongQuery*)p2; + SLongQuery* plq1 = *(SLongQuery**)p1; + SLongQuery* plq2 = *(SLongQuery**)p2; if(plq1->startExecTs == plq2->startExecTs) { return 0; } else if(plq1->startExecTs > plq2->startExecTs) { - return -1; - } else { return 1; + } else { + return -1; } } @@ -686,15 +685,7 @@ static void cbFoundItem(void* handle, void* param1) { // push to qids SLongQuery* plq = (SLongQuery*)malloc(sizeof(SLongQuery)); plq->qId = qInfo->qId; - plq->startExecTs = qInfo->startExecTs; - - // commitedMs - if(imem) { - plq->commitedMs = imem->commitedMs; - } else { - plq->commitedMs = 0; - } - + plq->startExecTs = qInfo->startExecTs; taosArrayPush(qids, &plq); } @@ -735,11 +726,13 @@ bool qFixedNoBlock(void* pRepo, void* pMgmt, int32_t longQueryMs) { SLongQuery* plq; for(i=0; i < cnt; i++) { plq = (SLongQuery* )taosArrayGetP(qids, i); + printf(" sort i=%d span=%d qid=0x%"PRIx64" exeTime=0x%"PRIx64". \n",(int)i, (int)(now - plq->startExecTs), plq->qId, plq->startExecTs); if(plq->startExecTs > now) continue; if(now - plq->startExecTs >= longQueryMs) { - qKillQueryByQId(pMgmt, plq->qId, 100, 30); // wait 50*100 ms + qKillQueryByQId(pMgmt, plq->qId, 500, 10); // wait 50*100 ms if(tsdbNoProblem(pRepo)) { fixed = true; + qWarn("QId:0x%"PRIx64" fixed problem after kill this query.", plq->qId); break; } } @@ -755,8 +748,9 @@ bool qFixedNoBlock(void* pRepo, void* pMgmt, int32_t longQueryMs) { //solve tsdb no block to commit bool qSolveCommitNoBlock(void* pRepo, void* pMgmt) { + qWarn("start solve no block problem."); if(qFixedNoBlock(pRepo, pMgmt, 20*1000)) { return true; } return qFixedNoBlock(pRepo, pMgmt, 5*1000); -} \ No newline at end of file +} diff --git a/src/tsdb/src/tsdbBuffer.c b/src/tsdb/src/tsdbBuffer.c index af75fc45fde1029eace036698883c330e997cb3c..2ba41dca2aece3defee73a677c32754830e83537 100644 --- a/src/tsdb/src/tsdbBuffer.c +++ b/src/tsdb/src/tsdbBuffer.c @@ -216,7 +216,10 @@ void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic tsdbFreeBufBlock(pBufBlock); free(pNode); if(bELastic) - pPool->nElasticBlocks--; + { + pPool->nElasticBlocks--; + printf(" elastic block reduce one ok. current blocks=%d \n", pPool->nElasticBlocks); + } else pPool->nBufBlocks--; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 6fae5c6555fbf8a6eb6deb5b746bacd2e896c375..5514fa80c84ae3ab9242e90379820ccd2a68eda4 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -548,7 +548,6 @@ static void tsdbEndCommit(STsdbRepo *pRepo, int eno) { pRepo->imem = NULL; (void)tsdbUnlockRepo(pRepo); //save commited time - pIMem->commitedMs = taosGetTimestampMs(); tsdbUnRefMemTable(pRepo, pIMem); tsem_post(&(pRepo->readyToCommit)); } diff --git a/src/tsdb/src/tsdbHealth.c b/src/tsdb/src/tsdbHealth.c index b590df28fe6cb18c8e75963531a9525cb809fb54..dddf40d96377fabbd56718bd6f2525ae95a7ad66 100644 --- a/src/tsdb/src/tsdbHealth.c +++ b/src/tsdb/src/tsdbHealth.c @@ -40,6 +40,7 @@ int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) { } else { pPool->nElasticBlocks ++; cnt ++ ; + printf(" elastic block add one ok. current blocks=%d \n", pPool->nElasticBlocks); } } } @@ -68,7 +69,7 @@ bool tsdbUrgeQueryFree(STsdbRepo * pRepo) { bool tsdbIdleMemEnough() { // TODO config to taos.cfg - int32_t lowestRate = 10; // below 10% idle memory, return not enough memory + int32_t lowestRate = 5; // below 10% idle memory, return not enough memory float memoryUsedMB = 0; float memoryAvailMB; @@ -94,8 +95,9 @@ bool tsdbIdleMemEnough() { bool tsdbAllowNewBlock(STsdbRepo* pRepo) { //TODO config to taos.cfg - int32_t nMaxElastic = 0; + int32_t nMaxElastic = 1; STsdbBufPool* pPool = pRepo->pPool; + printf("tsdbAllowNewBlock nElasticBlock(%d) MaxElasticBlocks(%d)\n", pPool->nElasticBlocks, nMaxElastic); if(pPool->nElasticBlocks >= nMaxElastic) { tsdbWarn("tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", pPool->nElasticBlocks, nMaxElastic); return false; @@ -106,9 +108,7 @@ bool tsdbAllowNewBlock(STsdbRepo* pRepo) { bool tsdbNoProblem(STsdbRepo* pRepo) { if(!tsdbIdleMemEnough()) return false; - - if(listNEles(pRepo->pPool->bufBlockList)) + if(listNEles(pRepo->pPool->bufBlockList) == 0) return false; - return true; } \ No newline at end of file