未验证 提交 7f9169ca 编写于 作者: A Alex Duan 提交者: GitHub

Merge pull request #7545 from taosdata/long_query

Long query
......@@ -224,6 +224,8 @@ extern uint32_t maxRange;
extern uint32_t curRange;
extern char Compressor[];
#endif
// long query
extern int8_t tsDeadLockKillQuery;
typedef struct {
char dir[TSDB_FILENAME_LEN];
......
......@@ -279,6 +279,9 @@ uint32_t curRange = 100; // range
char Compressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR
#endif
// long query death-lock
int8_t tsDeadLockKillQuery = 0;
int32_t (*monStartSystemFp)() = NULL;
void (*monStopSystemFp)() = NULL;
void (*monExecuteSQLFp)(char *sql) = NULL;
......@@ -1613,7 +1616,17 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
assert(tsGlobalConfigNum <= TSDB_CFG_MAX_NUM);
// enable kill long query
cfg.option = "deadLockKillQuery";
cfg.ptr = &tsDeadLockKillQuery;
cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 1;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
#ifdef TD_TSZ
// lossy compress
cfg.option = "lossyColumns";
......@@ -1667,6 +1680,9 @@ static void doInitGlobalConfig(void) {
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM);
#else
assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM - 5);
#endif
}
......
......@@ -76,6 +76,11 @@ void* qGetResultRetrieveMsg(qinfo_t qinfo);
*/
int32_t qKillQuery(qinfo_t qinfo);
//kill by qid
int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount);
bool qSolveCommitNoBlock(void* pRepo, void* pMgmt);
int32_t qQueryCompleted(qinfo_t qinfo);
/**
......
......@@ -35,6 +35,7 @@ int32_t* taosGetErrno();
#define terrno (*taosGetErrno())
#define TSDB_CODE_SUCCESS 0
#define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error
// rpc
#define TSDB_CODE_RPC_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0001) //"Action in progress")
......
......@@ -39,6 +39,7 @@ extern "C" {
#define TSDB_STATUS_COMMIT_START 1
#define TSDB_STATUS_COMMIT_OVER 2
#define TSDB_STATUS_COMMIT_NOBLOCK 3 //commit no block, need to be solved
// TSDB STATE DEFINITION
#define TSDB_STATE_OK 0x0
......@@ -413,6 +414,11 @@ int tsdbSyncRecv(void *pRepo, SOCKET socketFd);
// For TSDB Compact
int tsdbCompact(STsdbRepo *pRepo);
// For TSDB Health Monitor
// no problem return true
bool tsdbNoProblem(STsdbRepo* pRepo);
#ifdef __cplusplus
}
#endif
......
......@@ -2398,11 +2398,11 @@ bool isQueryKilled(SQInfo *pQInfo) {
// query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
// abort current query execution.
if (pQInfo->owner != 0 && ((taosGetTimestampSec() - pQInfo->startExecTs) > getMaximumIdleDurationSec()) &&
if (pQInfo->owner != 0 && ((taosGetTimestampSec() - pQInfo->startExecTs/1000) > getMaximumIdleDurationSec()) &&
(!needBuildResAfterQueryComplete(pQInfo))) {
assert(pQInfo->startExecTs != 0);
qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d sec, abort current query execution, start:%" PRId64
qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64
", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec());
return true;
}
......@@ -8409,6 +8409,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
}
pQInfo->qId = qId;
pQInfo->startExecTs = 0;
pQInfo->runtimeEnv.pUdfInfo = pUdfInfo;
......
......@@ -35,7 +35,7 @@ typedef struct SQueryMgmt {
bool closed;
} SQueryMgmt;
static void queryMgmtKillQueryFn(void* handle) {
static void queryMgmtKillQueryFn(void* handle, void* param1) {
void** fp = (void**)handle;
qKillQuery(*fp);
}
......@@ -215,6 +215,51 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
return code;
}
#ifdef TEST_IMPL
// wait moment
int waitMoment(SQInfo* pQInfo){
if(pQInfo->sql) {
int ms = 0;
char* pcnt = strstr(pQInfo->sql, " count(*)");
if(pcnt) return 0;
char* pos = strstr(pQInfo->sql, " t_");
if(pos){
pos += 3;
ms = atoi(pos);
while(*pos >= '0' && *pos <= '9'){
pos ++;
}
char unit_char = *pos;
if(unit_char == 'h'){
ms *= 3600*1000;
} else if(unit_char == 'm'){
ms *= 60*1000;
} else if(unit_char == 's'){
ms *= 1000;
}
}
if(ms == 0) return 0;
printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
if(ms < 1000) {
taosMsleep(ms);
} else {
int used_ms = 0;
while(used_ms < ms) {
taosMsleep(1000);
used_ms += 1000;
if(isQueryKilled(pQInfo)){
printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
break;
}
}
}
}
return 1;
}
#endif
bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
SQInfo *pQInfo = (SQInfo *)qinfo;
assert(pQInfo && pQInfo->signature == pQInfo);
......@@ -228,7 +273,8 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
}
*qId = pQInfo->qId;
pQInfo->startExecTs = taosGetTimestampSec();
if(pQInfo->startExecTs == 0)
pQInfo->startExecTs = taosGetTimestampMs();
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:0x%"PRIx64" it is already killed, abort", pQInfo->qId);
......@@ -259,7 +305,9 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
int64_t st = taosGetTimestampUs();
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup);
pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st);
#ifdef TEST_IMPL
waitMoment(pQInfo);
#endif
publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_AFTER_OPERATOR_EXEC);
pRuntimeEnv->resultInfo.total += GET_NUM_OF_RESULTS(pRuntimeEnv);
......@@ -479,7 +527,7 @@ void qQueryMgmtNotifyClosed(void* pQMgmt) {
pQueryMgmt->closed = true;
pthread_mutex_unlock(&pQueryMgmt->lock);
taosCacheRefresh(pQueryMgmt->qinfoPool, queryMgmtKillQueryFn);
taosCacheRefresh(pQueryMgmt->qinfoPool, queryMgmtKillQueryFn, NULL);
}
void qQueryMgmtReOpen(void *pQMgmt) {
......@@ -574,3 +622,148 @@ void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle) {
taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, freeHandle);
return 0;
}
//kill by qid
int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount) {
int32_t error = TSDB_CODE_SUCCESS;
void** handle = qAcquireQInfo(pMgmt, qId);
if(handle == NULL) return terrno;
SQInfo* pQInfo = (SQInfo*)(*handle);
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qWarn("QId:0x%"PRIx64" be killed(no memory commit).", pQInfo->qId);
setQueryKilled(pQInfo);
// wait query stop
int32_t loop = 0;
while (pQInfo->owner != 0) {
taosMsleep(waitMs);
if(loop++ > waitCount){
error = TSDB_CODE_FAILED;
break;
}
}
qReleaseQInfo(pMgmt, (void **)&handle, true);
return error;
}
// local struct
typedef struct {
int64_t qId;
int64_t startExecTs;
} SLongQuery;
// callbark for sort compare
static int compareLongQuery(const void* p1, const void* p2) {
// sort desc
SLongQuery* plq1 = *(SLongQuery**)p1;
SLongQuery* plq2 = *(SLongQuery**)p2;
if(plq1->startExecTs == plq2->startExecTs) {
return 0;
} else if(plq1->startExecTs > plq2->startExecTs) {
return 1;
} else {
return -1;
}
}
// callback for taosCacheRefresh
static void cbFoundItem(void* handle, void* param1) {
SQInfo * qInfo = *(SQInfo**) handle;
if(qInfo == NULL) return ;
SArray* qids = (SArray*) param1;
if(qids == NULL) return ;
bool usedMem = true;
bool usedIMem = true;
SMemTable* mem = qInfo->query.memRef.snapshot.omem;
SMemTable* imem = qInfo->query.memRef.snapshot.imem;
if(mem == NULL || T_REF_VAL_GET(mem) == 0)
usedMem = false;
if(imem == NULL || T_REF_VAL_GET(mem) == 0)
usedIMem = false ;
if(!usedMem && !usedIMem)
return ;
// push to qids
SLongQuery* plq = (SLongQuery*)malloc(sizeof(SLongQuery));
plq->qId = qInfo->qId;
plq->startExecTs = qInfo->startExecTs;
taosArrayPush(qids, &plq);
}
// longquery
void* qObtainLongQuery(void* param){
SQueryMgmt* qMgmt = (SQueryMgmt*)param;
if(qMgmt == NULL || qMgmt->qinfoPool == NULL)
return NULL;
SArray* qids = taosArrayInit(4, sizeof(int64_t*));
if(qids == NULL) return NULL;
// Get each item
taosCacheRefresh(qMgmt->qinfoPool, cbFoundItem, qids);
size_t cnt = taosArrayGetSize(qids);
if(cnt == 0) {
taosArrayDestroy(qids);
return NULL;
}
if(cnt > 1)
taosArraySort(qids, compareLongQuery);
return qids;
}
//solve tsdb no block to commit
bool qFixedNoBlock(void* pRepo, void* pMgmt, int32_t longQueryMs) {
SQueryMgmt *pQueryMgmt = pMgmt;
bool fixed = false;
// qid top list
SArray *qids = (SArray*)qObtainLongQuery(pQueryMgmt);
if(qids == NULL) return false;
// kill Query
int64_t now = taosGetTimestampMs();
size_t cnt = taosArrayGetSize(qids);
size_t i;
SLongQuery* plq;
for(i=0; i < cnt; i++) {
plq = (SLongQuery* )taosArrayGetP(qids, i);
if(plq->startExecTs > now) continue;
if(now - plq->startExecTs >= longQueryMs) {
qKillQueryByQId(pMgmt, plq->qId, 500, 10); // wait 50*100 ms
if(tsdbNoProblem(pRepo)) {
fixed = true;
qWarn("QId:0x%"PRIx64" fixed problem after kill this query.", plq->qId);
break;
}
}
}
// free qids
for(i=0; i < cnt; i++) {
free(taosArrayGetP(qids, i));
}
taosArrayDestroy(qids);
return fixed;
}
//solve tsdb no block to commit
bool qSolveCommitNoBlock(void* pRepo, void* pMgmt) {
qWarn("pRepo=%p start solve problem.", pRepo);
if(qFixedNoBlock(pRepo, pMgmt, 10*60*1000)) {
return true;
}
if(qFixedNoBlock(pRepo, pMgmt, 2*60*1000)){
return true;
}
if(qFixedNoBlock(pRepo, pMgmt, 30*1000)){
return true;
}
qWarn("pRepo=%p solve problem failed.", pRepo);
return false;
}
......@@ -29,6 +29,7 @@ typedef struct {
int tBufBlocks;
int nBufBlocks;
int nRecycleBlocks;
int nElasticBlocks;
int64_t index;
SList* bufBlockList;
} STsdbBufPool;
......@@ -41,6 +42,10 @@ int tsdbOpenBufPool(STsdbRepo* pRepo);
void tsdbCloseBufPool(STsdbRepo* pRepo);
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks);
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode);
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic);
// health cite
STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize);
void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock);
#endif /* _TD_TSDB_BUFFER_H_ */
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_HEALTH_H_
#define _TD_TSDB_HEALTH_H_
bool tsdbUrgeQueryFree(STsdbRepo* pRepo);
int32_t tsdbInsertNewBlock(STsdbRepo* pRepo);
bool tsdbIdleMemEnough();
bool tsdbAllowNewBlock(STsdbRepo* pRepo);
#endif /* _TD_TSDB_BUFFER_H_ */
......@@ -97,6 +97,7 @@ struct STsdbRepo {
SMergeBuf mergeBuf; //used when update=2
int8_t compactState; // compact state: inCompact/noCompact/waitingCompact?
pthread_t* pthread;
};
#define REPO_ID(r) (r)->config.tsdbId
......
......@@ -14,12 +14,10 @@
*/
#include "tsdbint.h"
#include "tsdbHealth.h"
#define POOL_IS_EMPTY(b) (listNEles((b)->bufBlockList) == 0)
static STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize);
static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock);
// ---------------- INTERNAL FUNCTIONS ----------------
STsdbBufPool *tsdbNewBufPool() {
STsdbBufPool *pBufPool = (STsdbBufPool *)calloc(1, sizeof(*pBufPool));
......@@ -65,10 +63,10 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) {
STsdbBufPool *pPool = pRepo->pPool;
ASSERT(pPool != NULL);
pPool->bufBlockSize = pCfg->cacheBlockSize * 1024 * 1024; // MB
pPool->tBufBlocks = pCfg->totalBlocks;
pPool->nBufBlocks = 0;
pPool->nElasticBlocks = 0;
pPool->index = 0;
pPool->nRecycleBlocks = 0;
......@@ -120,6 +118,18 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
STsdbBufPool *pBufPool = pRepo->pPool;
while (POOL_IS_EMPTY(pBufPool)) {
if(tsDeadLockKillQuery) {
// supply new Block
if(tsdbInsertNewBlock(pRepo) > 0) {
tsdbWarn("vgId:%d add new elastic block . elasticBlocks=%d cur free Blocks=%d", REPO_ID(pRepo), pBufPool->nElasticBlocks, pBufPool->bufBlockList->numOfEles);
break;
} else {
// no newBlock, kill query free
if(!tsdbUrgeQueryFree(pRepo))
tsdbWarn("vgId:%d Urge query free thread start failed.", REPO_ID(pRepo));
}
}
pRepo->repoLocked = false;
pthread_cond_wait(&(pBufPool->poolNotEmpty), &(pRepo->mutex));
pRepo->repoLocked = true;
......@@ -139,11 +149,11 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
}
// ---------------- LOCAL FUNCTIONS ----------------
static STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) {
STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) {
STsdbBufBlock *pBufBlock = (STsdbBufBlock *)malloc(sizeof(*pBufBlock) + bufBlockSize);
if (pBufBlock == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
return NULL;
}
pBufBlock->blockId = 0;
......@@ -151,13 +161,9 @@ static STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) {
pBufBlock->remain = bufBlockSize;
return pBufBlock;
_err:
tsdbFreeBufBlock(pBufBlock);
return NULL;
}
static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); }
void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); }
int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) {
if (oldTotalBlocks == pRepo->config.totalBlocks) {
......@@ -193,10 +199,16 @@ err:
return err;
}
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode) {
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic) {
STsdbBufBlock *pBufBlock = NULL;
tdListNodeGetData(pPool->bufBlockList, pNode, (void *)(&pBufBlock));
tsdbFreeBufBlock(pBufBlock);
free(pNode);
pPool->nBufBlocks--;
}
if(bELastic)
{
pPool->nElasticBlocks--;
tsdbWarn("pPool=%p elastic block reduce one . nElasticBlocks=%d cur free Blocks=%d", pPool, pPool->nElasticBlocks, pPool->bufBlockList->numOfEles);
}
else
pPool->nBufBlocks--;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taosmsg.h"
#include "tarray.h"
#include "query.h"
#include "tglobal.h"
#include "tlist.h"
#include "tsdbint.h"
#include "tsdbBuffer.h"
#include "tsdbLog.h"
#include "tsdbHealth.h"
#include "ttimer.h"
#include "tthread.h"
// return malloc new block count
int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) {
STsdbBufPool *pPool = pRepo->pPool;
int32_t cnt = 0;
if(tsdbAllowNewBlock(pRepo)) {
STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
if (pBufBlock) {
if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) {
// append error
tsdbFreeBufBlock(pBufBlock);
} else {
pPool->nElasticBlocks ++;
cnt ++ ;
}
}
}
return cnt;
}
// switch anther thread to run
void* cbKillQueryFree(void* param) {
STsdbRepo* pRepo = (STsdbRepo*)param;
// vnode
if(pRepo->appH.notifyStatus) {
pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_NOBLOCK, TSDB_CODE_SUCCESS);
}
// free
if(pRepo->pthread){
void* p = pRepo->pthread;
pRepo->pthread = NULL;
free(p);
}
return NULL;
}
// return true do free , false do nothing
bool tsdbUrgeQueryFree(STsdbRepo * pRepo) {
// check previous running
if(pRepo->pthread && taosThreadRunning(pRepo->pthread)) {
tsdbWarn("vgId:%d pre urge thread is runing. nBlocks=%d nElasticBlocks=%d", REPO_ID(pRepo), pRepo->pPool->nBufBlocks, pRepo->pPool->nElasticBlocks);
return false;
}
// create new
pRepo->pthread = taosCreateThread(cbKillQueryFree, pRepo);
if(pRepo->pthread == NULL) {
tsdbError("vgId:%d create urge thread error.", REPO_ID(pRepo));
return false;
}
return true;
}
bool tsdbAllowNewBlock(STsdbRepo* pRepo) {
int32_t nMaxElastic = pRepo->config.totalBlocks/3;
STsdbBufPool* pPool = pRepo->pPool;
if(pPool->nElasticBlocks >= nMaxElastic) {
tsdbWarn("vgId:%d tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", REPO_ID(pRepo), pPool->nElasticBlocks, nMaxElastic);
return false;
}
return true;
}
bool tsdbNoProblem(STsdbRepo* pRepo) {
if(listNEles(pRepo->pPool->bufBlockList) == 0)
return false;
return true;
}
\ No newline at end of file
......@@ -16,6 +16,8 @@
// no test file errors here
#include "taosdef.h"
#include "tsdbint.h"
#include "ttimer.h"
#include "tthread.h"
#define IS_VALID_PRECISION(precision) \
(((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO))
......@@ -126,6 +128,10 @@ int tsdbCloseRepo(STsdbRepo *repo, int toCommit) {
terrno = TSDB_CODE_SUCCESS;
tsdbStopStream(pRepo);
if(pRepo->pthread){
taosDestoryThread(pRepo->pthread);
pRepo->pthread = NULL;
}
if (toCommit) {
tsdbSyncCommit(repo);
......@@ -547,6 +553,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
pRepo->appH = *pAppH;
}
pRepo->repoLocked = false;
pRepo->pthread = NULL;
int code = pthread_mutex_init(&(pRepo->mutex), NULL);
if (code != 0) {
......
......@@ -99,17 +99,22 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
STsdbBufPool *pBufPool = pRepo->pPool;
SListNode *pNode = NULL;
bool recycleBlocks = pBufPool->nRecycleBlocks > 0;
bool addNew = false;
if (tsdbLockRepo(pRepo) < 0) return -1;
while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) {
if (pBufPool->nRecycleBlocks > 0) {
tsdbRecycleBufferBlock(pBufPool, pNode);
tsdbRecycleBufferBlock(pBufPool, pNode, false);
pBufPool->nRecycleBlocks -= 1;
} else {
tdListAppendNode(pBufPool->bufBlockList, pNode);
if(pBufPool->nElasticBlocks > 0 && listNEles(pBufPool->bufBlockList) > 2) {
tsdbRecycleBufferBlock(pBufPool, pNode, true);
} else {
tdListAppendNode(pBufPool->bufBlockList, pNode);
addNew = true;
}
}
}
if (!recycleBlocks) {
if (addNew) {
int code = pthread_cond_signal(&pBufPool->poolNotEmpty);
if (code != 0) {
if (tsdbUnlockRepo(pRepo) < 0) return -1;
......
......@@ -33,6 +33,7 @@ extern "C" {
#endif
typedef void (*__cache_free_fn_t)(void*);
typedef void (*__cache_trav_fn_t)(void*, void*);
typedef struct SCacheStatis {
int64_t missCount;
......@@ -176,7 +177,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj);
* @param fp
* @return
*/
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp);
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void* param1);
/**
* stop background refresh worker thread
......
......@@ -20,7 +20,7 @@
extern "C" {
#endif
#define TSDB_CFG_MAX_NUM 116 // 110 + 6 with lossy option
#define TSDB_CFG_MAX_NUM 122
#define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TTHREAD_H
#define TDENGINE_TTHREAD_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "taosdef.h"
// create new thread
pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param);
// destory thread
bool taosDestoryThread(pthread_t* pthread);
// thread running return true
bool taosThreadRunning(pthread_t* pthread);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TTHREAD_H
......@@ -505,7 +505,8 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
typedef struct SHashTravSupp {
SCacheObj* pCacheObj;
int64_t time;
__cache_free_fn_t fp;
__cache_trav_fn_t fp;
void* param1;
} SHashTravSupp;
static bool travHashTableEmptyFn(void* param, void* data) {
......@@ -667,17 +668,17 @@ bool travHashTableFn(void* param, void* data) {
}
if (ps->fp) {
(ps->fp)(pNode->data);
(ps->fp)(pNode->data, ps->param1);
}
// do not remove element in hash table
return true;
}
static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t fp) {
static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_trav_fn_t fp, void* param1) {
assert(pCacheObj != NULL);
SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time};
SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time, .param1 = param1};
taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup);
}
......@@ -748,7 +749,7 @@ void* taosCacheTimedRefresh(void *handle) {
// refresh data in hash table
if (elemInHash > 0) {
int64_t now = taosGetTimestampMs();
doCacheRefresh(pCacheObj, now, NULL);
doCacheRefresh(pCacheObj, now, NULL, NULL);
}
taosTrashcanEmpty(pCacheObj, false);
......@@ -766,13 +767,13 @@ void* taosCacheTimedRefresh(void *handle) {
return NULL;
}
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp) {
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void* param1) {
if (pCacheObj == NULL) {
return;
}
int64_t now = taosGetTimestampMs();
doCacheRefresh(pCacheObj, now, fp);
doCacheRefresh(pCacheObj, now, fp, param1);
}
void taosStopCacheRefreshWorker(void) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tthread.h"
#include "tglobal.h"
#include "taosdef.h"
#include "tutil.h"
#include "tulog.h"
#include "taoserror.h"
// create new thread
pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param) {
pthread_t* pthread = (pthread_t*)malloc(sizeof(pthread_t));
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
int32_t ret = pthread_create(pthread, &thattr, __start_routine, param);
pthread_attr_destroy(&thattr);
if (ret != 0) {
free(pthread);
return NULL;
}
return pthread;
}
// destory thread
bool taosDestoryThread(pthread_t* pthread) {
if(pthread == NULL) return false;
if(taosThreadRunning(pthread)) {
pthread_cancel(*pthread);
pthread_join(*pthread, NULL);
}
free(pthread);
return true;
}
// thread running return true
bool taosThreadRunning(pthread_t* pthread) {
if(pthread == NULL) return false;
int ret = pthread_kill(*pthread, 0);
if(ret == ESRCH)
return false;
if(ret == EINVAL)
return false;
// alive
return true;
}
......@@ -560,5 +560,10 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
return vnodeSaveVersion(pVnode);
}
// timer thread callback
if(status == TSDB_STATUS_COMMIT_NOBLOCK) {
qSolveCommitNoBlock(pVnode->tsdb, pVnode->qMgmt);
}
return 0;
}
......@@ -12,6 +12,8 @@
# -*- coding: utf-8 -*-
import sys
import numpy as np
from util.log import *
from util.cases import *
from util.sql import *
......@@ -24,8 +26,17 @@ class TDTestCase:
tdSql.init(conn.cursor(), logSql)
def run(self):
# tdSql.query("show variables")
# tdSql.checkData(54, 1, 864000)
tdSql.execute("show variables")
res = tdSql.cursor.fetchall()
resList = np.array(res)
index = np.where(resList == "offlineThreshold")
index_value = np.dstack((index[0])).squeeze()
tdSql.query("show variables")
tdSql.checkData(55, 1, 864000)
tdSql.checkData(index_value, 1, 864000)
pass
def stop(self):
tdSql.close()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册