提交 ca43acde 编写于 作者: A AlexDuan

use new thread replace timer

上级 ff0663f6
......@@ -97,7 +97,7 @@ struct STsdbRepo {
SMergeBuf mergeBuf; //used when update=2
int8_t compactState; // compact state: inCompact/noCompact/waitingCompact?
void* tmrCtrl;
pthread_t* pthread;
};
#define REPO_ID(r) (r)->config.tsdbId
......
......@@ -24,6 +24,7 @@
#include "tsdbLog.h"
#include "tsdbHealth.h"
#include "ttimer.h"
#include "tthread.h"
// return malloc new block count
......@@ -47,30 +48,44 @@ int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) {
}
// switch anther thread to run
void cbKillQueryFree(void* param1, void* param2) {
void* cbKillQueryFree(void* param1) {
STsdbRepo* pRepo = (STsdbRepo*)param1;
// vnode
if(pRepo->appH.notifyStatus) {
pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_NOBLOCK, TSDB_CODE_SUCCESS);
}
// free
if(pRepo->pthread){
void* p = pRepo->pthread;
pRepo->pthread = NULL;
free(p);
}
return NULL;
}
// return true do free , false do nothing
bool tsdbUrgeQueryFree(STsdbRepo * pRepo) {
// 1 start timer
if(pRepo->tmrCtrl == NULL){
pRepo->tmrCtrl = taosTmrInit(0, 0, 0, "REPO");
// check previous running
if(pRepo->pthread && taosThreadRunning(pRepo->pthread)) {
tsdbWarn("vgId:%d pre urge thread is runing. nBlocks=%d nElasticBlocks=%d", REPO_ID(pRepo), pRepo->pPool->nBufBlocks, pRepo->pPool->nElasticBlocks);
return false;
}
tmr_h hTimer = taosTmrStart(cbKillQueryFree, 1, pRepo, pRepo->tmrCtrl);
return hTimer != NULL;
// create new
pRepo->pthread = taosCreateThread(cbKillQueryFree, pRepo);
if(pRepo->pthread == NULL) {
tsdbError("vgId:%d create urge thread error.", REPO_ID(pRepo));
return false;
}
return true;
}
bool tsdbAllowNewBlock(STsdbRepo* pRepo) {
int32_t nMaxElastic = pRepo->config.totalBlocks/3;
STsdbBufPool* pPool = pRepo->pPool;
if(pPool->nElasticBlocks >= nMaxElastic) {
tsdbWarn("tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", pPool->nElasticBlocks, nMaxElastic);
tsdbWarn("vgId:%d tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", REPO_ID(pRepo), pPool->nElasticBlocks, nMaxElastic);
return false;
}
return true;
......
......@@ -17,6 +17,7 @@
#include "taosdef.h"
#include "tsdbint.h"
#include "ttimer.h"
#include "tthread.h"
#define IS_VALID_PRECISION(precision) \
(((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO))
......@@ -127,9 +128,9 @@ int tsdbCloseRepo(STsdbRepo *repo, int toCommit) {
terrno = TSDB_CODE_SUCCESS;
tsdbStopStream(pRepo);
if(pRepo->tmrCtrl){
taosTmrCleanUp(pRepo->tmrCtrl);
pRepo->tmrCtrl = NULL;
if(pRepo->pthread){
taosDestoryThread(pRepo->pthread);
pRepo->pthread = NULL;
}
if (toCommit) {
......@@ -552,7 +553,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
pRepo->appH = *pAppH;
}
pRepo->repoLocked = false;
pRepo->tmrCtrl = NULL;
pRepo->pthread = NULL;
int code = pthread_mutex_init(&(pRepo->mutex), NULL);
if (code != 0) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TTHREAD_H
#define TDENGINE_TTHREAD_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "taosdef.h"
// create new thread
pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param);
// destory thread
bool taosDestoryThread(pthread_t* pthread);
// thread running return true
bool taosThreadRunning(pthread_t* pthread);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TTHREAD_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tthread.h"
#include "tglobal.h"
#include "taosdef.h"
#include "tutil.h"
#include "tulog.h"
#include "taoserror.h"
// create new thread
pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param) {
pthread_t* pthread = (pthread_t*)malloc(sizeof(pthread_t));
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
int32_t ret = pthread_create(pthread, &thattr, __start_routine, param);
pthread_attr_destroy(&thattr);
if (ret != 0) {
free(pthread);
return NULL;
}
return pthread;
}
// destory thread
bool taosDestoryThread(pthread_t* pthread) {
if(pthread == NULL) return false;
if(taosThreadRunning(pthread)) {
pthread_cancel(*pthread);
pthread_join(*pthread, NULL);
}
free(pthread);
return true;
}
// thread running return true
bool taosThreadRunning(pthread_t* pthread) {
if(pthread == NULL) return false;
int ret = pthread_kill(*pthread, 0);
if(ret == ESRCH)
return false;
if(ret == EINVAL)
return false;
// alive
return true;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册