提交 b58af485 编写于 作者: C Cary Xu

Merge branch 'master' into hotfix/TS-774

#!/bin/bash
#Startup script for the nginx Web Server
# chkconfig: 2345 99 01
# description: Nginx For TDengine Service.
#
#
### BEGIN INIT INFO
# Provides: nginx
# Required-Start: $local_fs $network $syslog
# Required-Stop: $local_fs $network $syslog
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Short-Description: Starts nginx
# Description: Starts nginx.
### END INIT INFO
nginx=/usr/local/nginxd/sbin/nginx
case $1 in
start)
echo -n "Starting Nginx"
$nginx
echo " done."
;;
stop)
echo -n "Stopping Nginx"
$nginx -s stop
echo " done."
;;
test)
$nginx -t
echo "Success."
;;
reload)
echo -n "Reloading Nginx"
$nginx -s reload
echo " done."
;;
restart)
$nginx -s reload
echo "reload done."
;;
*)
echo "Usage: $0 {start|restart|reload|stop|test|show}"
;;
esac
#!/bin/bash
#Startup script for the nginx Web Server
# chkconfig: 2345 99 01
#
#
### BEGIN INIT INFO
# Provides: nginx
# Provides: nginx
# Required-Start: $local_fs $network $syslog
# Required-Stop: $local_fs $network $syslog
# Short-Description: Starts nginx
# Description: Starts nginx.
### END INIT INFO
nginx=/usr/local/nginxd/sbin/nginx
case $1 in
start)
echo -n "Starting Nginx"
$nginx
echo " done."
;;
stop)
echo -n "Stopping Nginx"
$nginx -s stop
echo " done."
;;
test)
$nginx -t
echo "Success."
;;
reload)
echo -n "Reloading Nginx"
$nginx -s reload
echo " done."
;;
restart)
$nginx -s reload
echo "reload done."
;;
*)
echo "Usage: $0 {start|restart|reload|stop|test|show}"
;;
esac
...@@ -582,6 +582,26 @@ function clean_service_on_sysvinit() { ...@@ -582,6 +582,26 @@ function clean_service_on_sysvinit() {
${csudo} rm -f ${service_config_dir}/taosd || : ${csudo} rm -f ${service_config_dir}/taosd || :
${csudo} rm -f ${service_config_dir}/tarbitratord || : ${csudo} rm -f ${service_config_dir}/tarbitratord || :
if [ "$verMode" == "cluster" ]; then\
if pidof nginxd ; then
${csudo} service nginxd stop || :
fi
if ((${initd_mod}==1)); then
if [ -e ${service_config_dir}/nginxd ]; then
${csudo} chkconfig --del nginxd || :
fi
elif ((${initd_mod}==2)); then
if [ -e ${service_config_dir}/nginxd ]; then
${csudo} insserv -r nginxd || :
fi
elif ((${initd_mod}==3)); then
if [ -e ${service_config_dir}/nginxd ]; then
${csudo} update-rc.d -f nginxd remove || :
fi
fi
${csudo} rm -f ${service_config_dir}/nginxd || :
fi
if $(which init &> /dev/null); then if $(which init &> /dev/null); then
${csudo} init q || : ${csudo} init q || :
fi fi
...@@ -598,11 +618,19 @@ function install_service_on_sysvinit() { ...@@ -598,11 +618,19 @@ function install_service_on_sysvinit() {
${csudo} cp ${script_dir}/init.d/taosd.deb ${service_config_dir}/taosd && ${csudo} chmod a+x ${service_config_dir}/taosd ${csudo} cp ${script_dir}/init.d/taosd.deb ${service_config_dir}/taosd && ${csudo} chmod a+x ${service_config_dir}/taosd
${csudo} cp -f ${script_dir}/init.d/tarbitratord.deb ${install_main_dir}/init.d/tarbitratord ${csudo} cp -f ${script_dir}/init.d/tarbitratord.deb ${install_main_dir}/init.d/tarbitratord
${csudo} cp ${script_dir}/init.d/tarbitratord.deb ${service_config_dir}/tarbitratord && ${csudo} chmod a+x ${service_config_dir}/tarbitratord ${csudo} cp ${script_dir}/init.d/tarbitratord.deb ${service_config_dir}/tarbitratord && ${csudo} chmod a+x ${service_config_dir}/tarbitratord
if [ "$verMode" == "cluster" ]; then
${csudo} cp -f ${script_dir}/init.d/nginxd.deb ${install_main_dir}/init.d/nginxd
${csudo} cp ${script_dir}/init.d/nginxd.deb ${service_config_dir}/nginxd && ${csudo} chmod a+x ${service_config_dir}/nginxd
fi
elif ((${os_type}==2)); then elif ((${os_type}==2)); then
${csudo} cp -f ${script_dir}/init.d/taosd.rpm ${install_main_dir}/init.d/taosd ${csudo} cp -f ${script_dir}/init.d/taosd.rpm ${install_main_dir}/init.d/taosd
${csudo} cp ${script_dir}/init.d/taosd.rpm ${service_config_dir}/taosd && ${csudo} chmod a+x ${service_config_dir}/taosd ${csudo} cp ${script_dir}/init.d/taosd.rpm ${service_config_dir}/taosd && ${csudo} chmod a+x ${service_config_dir}/taosd
${csudo} cp -f ${script_dir}/init.d/tarbitratord.rpm ${install_main_dir}/init.d/tarbitratord ${csudo} cp -f ${script_dir}/init.d/tarbitratord.rpm ${install_main_dir}/init.d/tarbitratord
${csudo} cp ${script_dir}/init.d/tarbitratord.rpm ${service_config_dir}/tarbitratord && ${csudo} chmod a+x ${service_config_dir}/tarbitratord ${csudo} cp ${script_dir}/init.d/tarbitratord.rpm ${service_config_dir}/tarbitratord && ${csudo} chmod a+x ${service_config_dir}/tarbitratord
if [ "$verMode" == "cluster" ]; then
${csudo} cp -f ${script_dir}/init.d/nginxd.rpm ${install_main_dir}/init.d/nginxd
${csudo} cp ${script_dir}/init.d/nginxd.rpm ${service_config_dir}/nginxd && ${csudo} chmod a+x ${service_config_dir}/nginxd
fi
fi fi
#restart_config_str="taos:2345:respawn:${service_config_dir}/taosd start" #restart_config_str="taos:2345:respawn:${service_config_dir}/taosd start"
...@@ -613,14 +641,26 @@ function install_service_on_sysvinit() { ...@@ -613,14 +641,26 @@ function install_service_on_sysvinit() {
${csudo} chkconfig --level 2345 taosd on || : ${csudo} chkconfig --level 2345 taosd on || :
${csudo} chkconfig --add tarbitratord || : ${csudo} chkconfig --add tarbitratord || :
${csudo} chkconfig --level 2345 tarbitratord on || : ${csudo} chkconfig --level 2345 tarbitratord on || :
if [ "$verMode" == "cluster" ]; then
${csudo} chkconfig --add nginxd || :
${csudo} chkconfig --level 0123456 nginxd on || :
${csudo} service nginxd start
fi
elif ((${initd_mod}==2)); then elif ((${initd_mod}==2)); then
${csudo} insserv taosd || : ${csudo} insserv taosd || :
${csudo} insserv -d taosd || : ${csudo} insserv -d taosd || :
${csudo} insserv tarbitratord || : ${csudo} insserv tarbitratord || :
${csudo} insserv -d tarbitratord || : ${csudo} insserv -d tarbitratord || :
if [ "$verMode" == "cluster" ]; then
${csudo} insserv nginxd || :
${csudo} insserv -d nginxd || :
fi
elif ((${initd_mod}==3)); then elif ((${initd_mod}==3)); then
${csudo} update-rc.d taosd defaults || : ${csudo} update-rc.d taosd defaults || :
${csudo} update-rc.d tarbitratord defaults || : ${csudo} update-rc.d tarbitratord defaults || :
if [ "$verMode" == "cluster" ]; then
${csudo} update-rc.d nginxd defaults || :
fi
fi fi
} }
...@@ -779,7 +819,7 @@ vercomp () { ...@@ -779,7 +819,7 @@ vercomp () {
function is_version_compatible() { function is_version_compatible() {
curr_version=`ls ${script_dir}/driver/libtaos.so* |cut -d '.' -f 3-6` curr_version=`ls ${script_dir}/driver/libtaos.so* | awk -F 'libtaos.so.' '{print $2}'`
if [ -f ${script_dir}/driver/vercomp.txt ]; then if [ -f ${script_dir}/driver/vercomp.txt ]; then
min_compatible_version=`cat ${script_dir}/driver/vercomp.txt` min_compatible_version=`cat ${script_dir}/driver/vercomp.txt`
......
...@@ -63,6 +63,8 @@ init_file_deb=${script_dir}/../deb/taosd ...@@ -63,6 +63,8 @@ init_file_deb=${script_dir}/../deb/taosd
init_file_rpm=${script_dir}/../rpm/taosd init_file_rpm=${script_dir}/../rpm/taosd
init_file_tarbitrator_deb=${script_dir}/../deb/tarbitratord init_file_tarbitrator_deb=${script_dir}/../deb/tarbitratord
init_file_tarbitrator_rpm=${script_dir}/../rpm/tarbitratord init_file_tarbitrator_rpm=${script_dir}/../rpm/tarbitratord
init_file_nginx_deb=${script_dir}/../deb/nginxd
init_file_nginx_rpm=${script_dir}/../rpm/nginxd
# make directories. # make directories.
mkdir -p ${install_dir} mkdir -p ${install_dir}
...@@ -73,6 +75,8 @@ mkdir -p ${install_dir}/init.d && cp ${init_file_deb} ${install_dir}/init.d/taos ...@@ -73,6 +75,8 @@ mkdir -p ${install_dir}/init.d && cp ${init_file_deb} ${install_dir}/init.d/taos
mkdir -p ${install_dir}/init.d && cp ${init_file_rpm} ${install_dir}/init.d/taosd.rpm mkdir -p ${install_dir}/init.d && cp ${init_file_rpm} ${install_dir}/init.d/taosd.rpm
mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_deb} ${install_dir}/init.d/tarbitratord.deb || : mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_deb} ${install_dir}/init.d/tarbitratord.deb || :
mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_rpm} ${install_dir}/init.d/tarbitratord.rpm || : mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_rpm} ${install_dir}/init.d/tarbitratord.rpm || :
mkdir -p ${install_dir}/init.d && cp ${init_file_nginx_deb} ${install_dir}/init.d/nginxd.deb || :
mkdir -p ${install_dir}/init.d && cp ${init_file_nginx_rpm} ${install_dir}/init.d/nginxd.rpm || :
if [ -f ${build_dir}/bin/jemalloc-config ]; then if [ -f ${build_dir}/bin/jemalloc-config ]; then
mkdir -p ${install_dir}/jemalloc/{bin,lib,lib/pkgconfig,include/jemalloc,share/doc/jemalloc,share/man/man3} mkdir -p ${install_dir}/jemalloc/{bin,lib,lib/pkgconfig,include/jemalloc,share/doc/jemalloc,share/man/man3}
......
...@@ -3145,12 +3145,16 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { ...@@ -3145,12 +3145,16 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
pSql->rootObj->retryReason = pSql->retryReason; pSql->rootObj->retryReason = pSql->retryReason;
SSqlObj *tmpSql = pSql->rootObj;
tscFreeSubobj(pSql->rootObj);
tfree(tmpSql->pSubs);
SArray* pNameList = taosArrayInit(1, POINTER_BYTES); SArray* pNameList = taosArrayInit(1, POINTER_BYTES);
SArray* vgroupList = taosArrayInit(1, POINTER_BYTES); SArray* vgroupList = taosArrayInit(1, POINTER_BYTES);
char* n = strdup(name); char* n = strdup(name);
taosArrayPush(pNameList, &n); taosArrayPush(pNameList, &n);
code = getMultiTableMetaFromMnode(pSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true); code = getMultiTableMetaFromMnode(tmpSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true);
taosArrayDestroyEx(pNameList, freeElem); taosArrayDestroyEx(pNameList, freeElem);
taosArrayDestroyEx(vgroupList, freeElem); taosArrayDestroyEx(vgroupList, freeElem);
......
...@@ -223,6 +223,8 @@ extern uint32_t curRange; ...@@ -223,6 +223,8 @@ extern uint32_t curRange;
extern char Compressor[]; extern char Compressor[];
#endif #endif
// long query
extern int8_t tsDeadLockKillQuery;
typedef struct { typedef struct {
char dir[TSDB_FILENAME_LEN]; char dir[TSDB_FILENAME_LEN];
......
...@@ -271,6 +271,9 @@ uint32_t curRange = 100; // range ...@@ -271,6 +271,9 @@ uint32_t curRange = 100; // range
char Compressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR char Compressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR
#endif #endif
// long query death-lock
int8_t tsDeadLockKillQuery = 1;
int32_t (*monStartSystemFp)() = NULL; int32_t (*monStartSystemFp)() = NULL;
void (*monStopSystemFp)() = NULL; void (*monStopSystemFp)() = NULL;
void (*monExecuteSQLFp)(char *sql) = NULL; void (*monExecuteSQLFp)(char *sql) = NULL;
...@@ -1606,6 +1609,17 @@ static void doInitGlobalConfig(void) { ...@@ -1606,6 +1609,17 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
// enable kill long query
cfg.option = "deadLockKillQuery";
cfg.ptr = &tsDeadLockKillQuery;
cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 1;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
#ifdef TD_TSZ #ifdef TD_TSZ
// lossy compress // lossy compress
cfg.option = "lossyColumns"; cfg.option = "lossyColumns";
......
...@@ -76,6 +76,11 @@ void* qGetResultRetrieveMsg(qinfo_t qinfo); ...@@ -76,6 +76,11 @@ void* qGetResultRetrieveMsg(qinfo_t qinfo);
*/ */
int32_t qKillQuery(qinfo_t qinfo); int32_t qKillQuery(qinfo_t qinfo);
//kill by qid
int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount);
bool qSolveCommitNoBlock(void* pRepo, void* pMgmt);
int32_t qQueryCompleted(qinfo_t qinfo); int32_t qQueryCompleted(qinfo_t qinfo);
/** /**
......
...@@ -35,6 +35,7 @@ int32_t* taosGetErrno(); ...@@ -35,6 +35,7 @@ int32_t* taosGetErrno();
#define terrno (*taosGetErrno()) #define terrno (*taosGetErrno())
#define TSDB_CODE_SUCCESS 0 #define TSDB_CODE_SUCCESS 0
#define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error
// rpc // rpc
#define TSDB_CODE_RPC_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0001) //"Action in progress") #define TSDB_CODE_RPC_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0001) //"Action in progress")
......
...@@ -39,6 +39,7 @@ extern "C" { ...@@ -39,6 +39,7 @@ extern "C" {
#define TSDB_STATUS_COMMIT_START 1 #define TSDB_STATUS_COMMIT_START 1
#define TSDB_STATUS_COMMIT_OVER 2 #define TSDB_STATUS_COMMIT_OVER 2
#define TSDB_STATUS_COMMIT_NOBLOCK 3 //commit no block, need to be solved
// TSDB STATE DEFINITION // TSDB STATE DEFINITION
#define TSDB_STATE_OK 0x0 #define TSDB_STATE_OK 0x0
...@@ -413,6 +414,11 @@ int tsdbSyncRecv(void *pRepo, SOCKET socketFd); ...@@ -413,6 +414,11 @@ int tsdbSyncRecv(void *pRepo, SOCKET socketFd);
// For TSDB Compact // For TSDB Compact
int tsdbCompact(STsdbRepo *pRepo); int tsdbCompact(STsdbRepo *pRepo);
// For TSDB Health Monitor
// no problem return true
bool tsdbNoProblem(STsdbRepo* pRepo);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include "taos.h" #include "taos.h"
#include "shellCommand.h" #include "shellCommand.h"
#define SHELL_INPUT_MAX_COMMAND_SIZE 500000 #define SHELL_INPUT_MAX_COMMAND_SIZE 10000
extern char configDir[]; extern char configDir[];
......
...@@ -8416,6 +8416,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S ...@@ -8416,6 +8416,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
} }
pQInfo->qId = qId; pQInfo->qId = qId;
pQInfo->startExecTs = 0;
pQInfo->runtimeEnv.pUdfInfo = pUdfInfo; pQInfo->runtimeEnv.pUdfInfo = pUdfInfo;
......
...@@ -35,7 +35,7 @@ typedef struct SQueryMgmt { ...@@ -35,7 +35,7 @@ typedef struct SQueryMgmt {
bool closed; bool closed;
} SQueryMgmt; } SQueryMgmt;
static void queryMgmtKillQueryFn(void* handle) { static void queryMgmtKillQueryFn(void* handle, void* param1) {
void** fp = (void**)handle; void** fp = (void**)handle;
qKillQuery(*fp); qKillQuery(*fp);
} }
...@@ -452,7 +452,7 @@ void qQueryMgmtNotifyClosed(void* pQMgmt) { ...@@ -452,7 +452,7 @@ void qQueryMgmtNotifyClosed(void* pQMgmt) {
pQueryMgmt->closed = true; pQueryMgmt->closed = true;
pthread_mutex_unlock(&pQueryMgmt->lock); pthread_mutex_unlock(&pQueryMgmt->lock);
taosCacheRefresh(pQueryMgmt->qinfoPool, queryMgmtKillQueryFn); taosCacheRefresh(pQueryMgmt->qinfoPool, queryMgmtKillQueryFn, NULL);
} }
void qQueryMgmtReOpen(void *pQMgmt) { void qQueryMgmtReOpen(void *pQMgmt) {
...@@ -547,3 +547,148 @@ void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle) { ...@@ -547,3 +547,148 @@ void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle) {
taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, freeHandle); taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, freeHandle);
return 0; return 0;
} }
//kill by qid
int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount) {
int32_t err = TSDB_CODE_SUCCESS;
void** handle = qAcquireQInfo(pMgmt, qId);
if(handle == NULL) return terrno;
SQInfo* pQInfo = (SQInfo*)(*handle);
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qWarn("QId:0x%"PRIx64" be killed(no memory commit).", pQInfo->qId);
setQueryKilled(pQInfo);
// wait query stop
int32_t loop = 0;
while (pQInfo->owner != 0) {
taosMsleep(waitMs);
if(loop++ > waitCount){
err = TSDB_CODE_FAILED;
break;
}
}
qReleaseQInfo(pMgmt, (void **)&handle, true);
return err;
}
// local struct
typedef struct {
int64_t qId;
int64_t startExecTs;
} SLongQuery;
// callbark for sort compare
static int compareLongQuery(const void* p1, const void* p2) {
// sort desc
SLongQuery* plq1 = *(SLongQuery**)p1;
SLongQuery* plq2 = *(SLongQuery**)p2;
if(plq1->startExecTs == plq2->startExecTs) {
return 0;
} else if(plq1->startExecTs > plq2->startExecTs) {
return 1;
} else {
return -1;
}
}
// callback for taosCacheRefresh
static void cbFoundItem(void* handle, void* param1) {
SQInfo * qInfo = *(SQInfo**) handle;
if(qInfo == NULL) return ;
SArray* qids = (SArray*) param1;
if(qids == NULL) return ;
bool usedMem = true;
bool usedIMem = true;
SMemTable* mem = qInfo->query.memRef.snapshot.omem;
SMemTable* imem = qInfo->query.memRef.snapshot.imem;
if(mem == NULL || T_REF_VAL_GET(mem) == 0)
usedMem = false;
if(imem == NULL || T_REF_VAL_GET(mem) == 0)
usedIMem = false ;
if(!usedMem && !usedIMem)
return ;
// push to qids
SLongQuery* plq = (SLongQuery*)malloc(sizeof(SLongQuery));
plq->qId = qInfo->qId;
plq->startExecTs = qInfo->startExecTs;
taosArrayPush(qids, &plq);
}
// longquery
void* qObtainLongQuery(void* param){
SQueryMgmt* qMgmt = (SQueryMgmt*)param;
if(qMgmt == NULL || qMgmt->qinfoPool == NULL)
return NULL;
SArray* qids = taosArrayInit(4, sizeof(int64_t*));
if(qids == NULL) return NULL;
// Get each item
taosCacheRefresh(qMgmt->qinfoPool, cbFoundItem, qids);
size_t cnt = taosArrayGetSize(qids);
if(cnt == 0) {
taosArrayDestroy(qids);
return NULL;
}
if(cnt > 1)
taosArraySort(qids, compareLongQuery);
return qids;
}
//solve tsdb no block to commit
bool qFixedNoBlock(void* pRepo, void* pMgmt, int32_t longQueryMs) {
SQueryMgmt *pQueryMgmt = pMgmt;
bool fixed = false;
// qid top list
SArray *qids = (SArray*)qObtainLongQuery(pQueryMgmt);
if(qids == NULL) return false;
// kill Query
int64_t now = taosGetTimestampMs();
size_t cnt = taosArrayGetSize(qids);
size_t i;
SLongQuery* plq;
for(i=0; i < cnt; i++) {
plq = (SLongQuery* )taosArrayGetP(qids, i);
if(plq->startExecTs > now) continue;
if(now - plq->startExecTs >= longQueryMs) {
qKillQueryByQId(pMgmt, plq->qId, 500, 10); // wait 50*100 ms
if(tsdbNoProblem(pRepo)) {
fixed = true;
qWarn("QId:0x%"PRIx64" fixed problem after kill this query.", plq->qId);
break;
}
}
}
// free qids
for(i=0; i < cnt; i++) {
free(taosArrayGetP(qids, i));
}
taosArrayDestroy(qids);
return fixed;
}
//solve tsdb no block to commit
bool qSolveCommitNoBlock(void* pRepo, void* pMgmt) {
qWarn("pRepo=%p start solve problem.", pRepo);
if(qFixedNoBlock(pRepo, pMgmt, 10*60*1000)) {
return true;
}
if(qFixedNoBlock(pRepo, pMgmt, 2*60*1000)){
return true;
}
if(qFixedNoBlock(pRepo, pMgmt, 30*1000)){
return true;
}
qWarn("pRepo=%p solve problem failed.", pRepo);
return false;
}
\ No newline at end of file
...@@ -29,6 +29,7 @@ typedef struct { ...@@ -29,6 +29,7 @@ typedef struct {
int tBufBlocks; int tBufBlocks;
int nBufBlocks; int nBufBlocks;
int nRecycleBlocks; int nRecycleBlocks;
int nElasticBlocks;
int64_t index; int64_t index;
SList* bufBlockList; SList* bufBlockList;
} STsdbBufPool; } STsdbBufPool;
...@@ -41,6 +42,10 @@ int tsdbOpenBufPool(STsdbRepo* pRepo); ...@@ -41,6 +42,10 @@ int tsdbOpenBufPool(STsdbRepo* pRepo);
void tsdbCloseBufPool(STsdbRepo* pRepo); void tsdbCloseBufPool(STsdbRepo* pRepo);
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks); int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks);
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode); void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic);
// health cite
STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize);
void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock);
#endif /* _TD_TSDB_BUFFER_H_ */ #endif /* _TD_TSDB_BUFFER_H_ */
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_HEALTH_H_
#define _TD_TSDB_HEALTH_H_
bool tsdbUrgeQueryFree(STsdbRepo* pRepo);
int32_t tsdbInsertNewBlock(STsdbRepo* pRepo);
bool tsdbIdleMemEnough();
bool tsdbAllowNewBlock(STsdbRepo* pRepo);
#endif /* _TD_TSDB_BUFFER_H_ */
...@@ -97,6 +97,7 @@ struct STsdbRepo { ...@@ -97,6 +97,7 @@ struct STsdbRepo {
SMergeBuf mergeBuf; //used when update=2 SMergeBuf mergeBuf; //used when update=2
int8_t compactState; // compact state: inCompact/noCompact/waitingCompact? int8_t compactState; // compact state: inCompact/noCompact/waitingCompact?
pthread_t* pthread;
}; };
#define REPO_ID(r) (r)->config.tsdbId #define REPO_ID(r) (r)->config.tsdbId
......
...@@ -14,11 +14,10 @@ ...@@ -14,11 +14,10 @@
*/ */
#include "tsdbint.h" #include "tsdbint.h"
#include "tsdbHealth.h"
#define POOL_IS_EMPTY(b) (listNEles((b)->bufBlockList) == 0) #define POOL_IS_EMPTY(b) (listNEles((b)->bufBlockList) == 0)
static STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize);
static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock);
// ---------------- INTERNAL FUNCTIONS ---------------- // ---------------- INTERNAL FUNCTIONS ----------------
STsdbBufPool *tsdbNewBufPool() { STsdbBufPool *tsdbNewBufPool() {
...@@ -69,6 +68,7 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) { ...@@ -69,6 +68,7 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) {
pPool->bufBlockSize = pCfg->cacheBlockSize * 1024 * 1024; // MB pPool->bufBlockSize = pCfg->cacheBlockSize * 1024 * 1024; // MB
pPool->tBufBlocks = pCfg->totalBlocks; pPool->tBufBlocks = pCfg->totalBlocks;
pPool->nBufBlocks = 0; pPool->nBufBlocks = 0;
pPool->nElasticBlocks = 0;
pPool->index = 0; pPool->index = 0;
pPool->nRecycleBlocks = 0; pPool->nRecycleBlocks = 0;
...@@ -120,6 +120,18 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { ...@@ -120,6 +120,18 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
STsdbBufPool *pBufPool = pRepo->pPool; STsdbBufPool *pBufPool = pRepo->pPool;
while (POOL_IS_EMPTY(pBufPool)) { while (POOL_IS_EMPTY(pBufPool)) {
if(tsDeadLockKillQuery) {
// supply new Block
if(tsdbInsertNewBlock(pRepo) > 0) {
tsdbWarn("vgId:%d add new elastic block . elasticBlocks=%d cur free Blocks=%d", REPO_ID(pRepo), pBufPool->nElasticBlocks, pBufPool->bufBlockList->numOfEles);
break;
} else {
// no newBlock, kill query free
if(!tsdbUrgeQueryFree(pRepo))
tsdbWarn("vgId:%d Urge query free thread start failed.", REPO_ID(pRepo));
}
}
pRepo->repoLocked = false; pRepo->repoLocked = false;
pthread_cond_wait(&(pBufPool->poolNotEmpty), &(pRepo->mutex)); pthread_cond_wait(&(pBufPool->poolNotEmpty), &(pRepo->mutex));
pRepo->repoLocked = true; pRepo->repoLocked = true;
...@@ -139,11 +151,11 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { ...@@ -139,11 +151,11 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
} }
// ---------------- LOCAL FUNCTIONS ---------------- // ---------------- LOCAL FUNCTIONS ----------------
static STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) { STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) {
STsdbBufBlock *pBufBlock = (STsdbBufBlock *)malloc(sizeof(*pBufBlock) + bufBlockSize); STsdbBufBlock *pBufBlock = (STsdbBufBlock *)malloc(sizeof(*pBufBlock) + bufBlockSize);
if (pBufBlock == NULL) { if (pBufBlock == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err; return NULL;
} }
pBufBlock->blockId = 0; pBufBlock->blockId = 0;
...@@ -151,13 +163,9 @@ static STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) { ...@@ -151,13 +163,9 @@ static STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) {
pBufBlock->remain = bufBlockSize; pBufBlock->remain = bufBlockSize;
return pBufBlock; return pBufBlock;
_err:
tsdbFreeBufBlock(pBufBlock);
return NULL;
} }
static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); } void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); }
int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) { int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) {
if (oldTotalBlocks == pRepo->config.totalBlocks) { if (oldTotalBlocks == pRepo->config.totalBlocks) {
...@@ -193,10 +201,15 @@ err: ...@@ -193,10 +201,15 @@ err:
return err; return err;
} }
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode) { void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic) {
STsdbBufBlock *pBufBlock = NULL; STsdbBufBlock *pBufBlock = NULL;
tdListNodeGetData(pPool->bufBlockList, pNode, (void *)(&pBufBlock)); tdListNodeGetData(pPool->bufBlockList, pNode, (void *)(&pBufBlock));
tsdbFreeBufBlock(pBufBlock); tsdbFreeBufBlock(pBufBlock);
free(pNode); free(pNode);
if(bELastic) {
pPool->nElasticBlocks--;
tsdbWarn("pPool=%p elastic block reduce one . nElasticBlocks=%d cur free Blocks=%d", pPool, pPool->nElasticBlocks, pPool->bufBlockList->numOfEles);
}
else
pPool->nBufBlocks--; pPool->nBufBlocks--;
} }
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taosmsg.h"
#include "tarray.h"
#include "query.h"
#include "tglobal.h"
#include "tlist.h"
#include "tsdbint.h"
#include "tsdbBuffer.h"
#include "tsdbLog.h"
#include "tsdbHealth.h"
#include "ttimer.h"
#include "tthread.h"
// return malloc new block count
int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) {
STsdbBufPool *pPool = pRepo->pPool;
int32_t cnt = 0;
if(tsdbAllowNewBlock(pRepo)) {
STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
if (pBufBlock) {
if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) {
// append error
tsdbFreeBufBlock(pBufBlock);
} else {
pPool->nElasticBlocks ++;
cnt ++ ;
}
}
}
return cnt;
}
// switch anther thread to run
void* cbKillQueryFree(void* param) {
STsdbRepo* pRepo = (STsdbRepo*)param;
// vnode
if(pRepo->appH.notifyStatus) {
pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_NOBLOCK, TSDB_CODE_SUCCESS);
}
// free
if(pRepo->pthread){
void* p = pRepo->pthread;
pRepo->pthread = NULL;
free(p);
}
return NULL;
}
// return true do free , false do nothing
bool tsdbUrgeQueryFree(STsdbRepo * pRepo) {
// check previous running
if(pRepo->pthread && taosThreadRunning(pRepo->pthread)) {
tsdbWarn("vgId:%d pre urge thread is runing. nBlocks=%d nElasticBlocks=%d", REPO_ID(pRepo), pRepo->pPool->nBufBlocks, pRepo->pPool->nElasticBlocks);
return false;
}
// create new
pRepo->pthread = taosCreateThread(cbKillQueryFree, pRepo);
if(pRepo->pthread == NULL) {
tsdbError("vgId:%d create urge thread error.", REPO_ID(pRepo));
return false;
}
return true;
}
bool tsdbAllowNewBlock(STsdbRepo* pRepo) {
int32_t nMaxElastic = pRepo->config.totalBlocks/3;
STsdbBufPool* pPool = pRepo->pPool;
if(pPool->nElasticBlocks >= nMaxElastic) {
tsdbWarn("vgId:%d tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", REPO_ID(pRepo), pPool->nElasticBlocks, nMaxElastic);
return false;
}
return true;
}
bool tsdbNoProblem(STsdbRepo* pRepo) {
if(listNEles(pRepo->pPool->bufBlockList) == 0)
return false;
return true;
}
\ No newline at end of file
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
// no test file errors here // no test file errors here
#include "taosdef.h" #include "taosdef.h"
#include "tsdbint.h" #include "tsdbint.h"
#include "ttimer.h"
#include "tthread.h"
#define IS_VALID_PRECISION(precision) \ #define IS_VALID_PRECISION(precision) \
(((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO)) (((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO))
...@@ -126,6 +128,10 @@ int tsdbCloseRepo(STsdbRepo *repo, int toCommit) { ...@@ -126,6 +128,10 @@ int tsdbCloseRepo(STsdbRepo *repo, int toCommit) {
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
tsdbStopStream(pRepo); tsdbStopStream(pRepo);
if(pRepo->pthread){
taosDestoryThread(pRepo->pthread);
pRepo->pthread = NULL;
}
if (toCommit) { if (toCommit) {
tsdbSyncCommit(repo); tsdbSyncCommit(repo);
...@@ -547,6 +553,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { ...@@ -547,6 +553,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
pRepo->appH = *pAppH; pRepo->appH = *pAppH;
} }
pRepo->repoLocked = false; pRepo->repoLocked = false;
pRepo->pthread = NULL;
int code = pthread_mutex_init(&(pRepo->mutex), NULL); int code = pthread_mutex_init(&(pRepo->mutex), NULL);
if (code != 0) { if (code != 0) {
......
...@@ -99,17 +99,22 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { ...@@ -99,17 +99,22 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
STsdbBufPool *pBufPool = pRepo->pPool; STsdbBufPool *pBufPool = pRepo->pPool;
SListNode *pNode = NULL; SListNode *pNode = NULL;
bool recycleBlocks = pBufPool->nRecycleBlocks > 0; bool addNew = false;
if (tsdbLockRepo(pRepo) < 0) return -1; if (tsdbLockRepo(pRepo) < 0) return -1;
while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) { while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) {
if (pBufPool->nRecycleBlocks > 0) { if (pBufPool->nRecycleBlocks > 0) {
tsdbRecycleBufferBlock(pBufPool, pNode); tsdbRecycleBufferBlock(pBufPool, pNode, false);
pBufPool->nRecycleBlocks -= 1; pBufPool->nRecycleBlocks -= 1;
} else {
if(pBufPool->nElasticBlocks > 0 && listNEles(pBufPool->bufBlockList) > 2) {
tsdbRecycleBufferBlock(pBufPool, pNode, true);
} else { } else {
tdListAppendNode(pBufPool->bufBlockList, pNode); tdListAppendNode(pBufPool->bufBlockList, pNode);
addNew = true;
}
} }
} }
if (!recycleBlocks) { if (addNew) {
int code = pthread_cond_signal(&pBufPool->poolNotEmpty); int code = pthread_cond_signal(&pBufPool->poolNotEmpty);
if (code != 0) { if (code != 0) {
if (tsdbUnlockRepo(pRepo) < 0) return -1; if (tsdbUnlockRepo(pRepo) < 0) return -1;
......
...@@ -33,6 +33,7 @@ extern "C" { ...@@ -33,6 +33,7 @@ extern "C" {
#endif #endif
typedef void (*__cache_free_fn_t)(void*); typedef void (*__cache_free_fn_t)(void*);
typedef void (*__cache_trav_fn_t)(void*, void*);
typedef struct SCacheStatis { typedef struct SCacheStatis {
int64_t missCount; int64_t missCount;
...@@ -176,7 +177,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj); ...@@ -176,7 +177,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj);
* @param fp * @param fp
* @return * @return
*/ */
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp); void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void* param1);
/** /**
* stop background refresh worker thread * stop background refresh worker thread
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#define TSDB_CFG_MAX_NUM 121 #define TSDB_CFG_MAX_NUM 122
#define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41 #define TSDB_CFG_VALUE_LEN 41
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TTHREAD_H
#define TDENGINE_TTHREAD_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "taosdef.h"
// create new thread
pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param);
// destory thread
bool taosDestoryThread(pthread_t* pthread);
// thread running return true
bool taosThreadRunning(pthread_t* pthread);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TTHREAD_H
...@@ -505,7 +505,8 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { ...@@ -505,7 +505,8 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
typedef struct SHashTravSupp { typedef struct SHashTravSupp {
SCacheObj* pCacheObj; SCacheObj* pCacheObj;
int64_t time; int64_t time;
__cache_free_fn_t fp; __cache_trav_fn_t fp;
void* param1;
} SHashTravSupp; } SHashTravSupp;
static bool travHashTableEmptyFn(void* param, void* data) { static bool travHashTableEmptyFn(void* param, void* data) {
...@@ -667,17 +668,17 @@ bool travHashTableFn(void* param, void* data) { ...@@ -667,17 +668,17 @@ bool travHashTableFn(void* param, void* data) {
} }
if (ps->fp) { if (ps->fp) {
(ps->fp)(pNode->data); (ps->fp)(pNode->data, ps->param1);
} }
// do not remove element in hash table // do not remove element in hash table
return true; return true;
} }
static void doCacheRefresh(SCacheObj* pCacheObj, int64_t utl_time, __cache_free_fn_t fp) { static void doCacheRefresh(SCacheObj* pCacheObj, int64_t utl_time, __cache_trav_fn_t fp, void* param1) {
assert(pCacheObj != NULL); assert(pCacheObj != NULL);
SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = utl_time}; SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = utl_time, .param1 = param1};
taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup); taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup);
} }
...@@ -748,7 +749,7 @@ void* taosCacheTimedRefresh(void *handle) { ...@@ -748,7 +749,7 @@ void* taosCacheTimedRefresh(void *handle) {
// refresh data in hash table // refresh data in hash table
if (elemInHash > 0) { if (elemInHash > 0) {
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
doCacheRefresh(pCacheObj, now, NULL); doCacheRefresh(pCacheObj, now, NULL, NULL);
} }
taosTrashcanEmpty(pCacheObj, false); taosTrashcanEmpty(pCacheObj, false);
...@@ -766,13 +767,13 @@ void* taosCacheTimedRefresh(void *handle) { ...@@ -766,13 +767,13 @@ void* taosCacheTimedRefresh(void *handle) {
return NULL; return NULL;
} }
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp) { void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void* param1) {
if (pCacheObj == NULL) { if (pCacheObj == NULL) {
return; return;
} }
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
doCacheRefresh(pCacheObj, now, fp); doCacheRefresh(pCacheObj, now, fp, param1);
} }
void taosStopCacheRefreshWorker(void) { void taosStopCacheRefreshWorker(void) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tthread.h"
#include "tglobal.h"
#include "taosdef.h"
#include "tutil.h"
#include "tulog.h"
#include "taoserror.h"
// create new thread
pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param) {
pthread_t* pthread = (pthread_t*)malloc(sizeof(pthread_t));
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
int32_t ret = pthread_create(pthread, &thattr, __start_routine, param);
pthread_attr_destroy(&thattr);
if (ret != 0) {
free(pthread);
return NULL;
}
return pthread;
}
// destory thread
bool taosDestoryThread(pthread_t* pthread) {
if(pthread == NULL) return false;
if(taosThreadRunning(pthread)) {
pthread_cancel(*pthread);
pthread_join(*pthread, NULL);
}
free(pthread);
return true;
}
// thread running return true
bool taosThreadRunning(pthread_t* pthread) {
if(pthread == NULL) return false;
int ret = pthread_kill(*pthread, 0);
if(ret == ESRCH)
return false;
if(ret == EINVAL)
return false;
// alive
return true;
}
...@@ -560,5 +560,10 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) { ...@@ -560,5 +560,10 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
return vnodeSaveVersion(pVnode); return vnodeSaveVersion(pVnode);
} }
// timer thread callback
if(status == TSDB_STATUS_COMMIT_NOBLOCK) {
qSolveCommitNoBlock(pVnode->tsdb, pVnode->qMgmt);
}
return 0; return 0;
} }
...@@ -32,33 +32,48 @@ sql create dnode $hostname3 ...@@ -32,33 +32,48 @@ sql create dnode $hostname3
system sh/exec.sh -n dnode3 -s start system sh/exec.sh -n dnode3 -s start
sleep 3000 sleep 3000
$x = 0
show1:
$x = $x + 1
sleep 1000
if $x == 30 then
return -1
endi
sql show dnodes sql show dnodes
print dnode1 $data5_1 print dnode1 $data5_1
print dnode1 $data5_2 print dnode2 $data5_2
print dnode1 $data5_3 print dnode3 $data5_3
if $data5_1 != mnode then if $data5_1 != mnode then
return -1 goto show1
endi endi
if $data5_2 != vnode then if $data5_2 != vnode then
return -1 goto show1
endi endi
if $data5_3 != any then if $data5_3 != any then
return -1 goto show1
endi endi
show2:
$x = $x + 1
sleep 1000
if $x == 30 then
return -1
endi
sql show mnodes sql show mnodes
print dnode1 ==> $data2_1 print dnode1 ==> $data2_1
print dnode2 ==> $data2_2 print dnode2 ==> $data2_2
print dnode3 ==> $data2_3 print dnode3 ==> $data2_3
if $data2_1 != master then if $data2_1 != master then
return -1 goto show2
endi endi
if $data2_2 != null then if $data2_2 != null then
return -1 goto show2
endi endi
if $data2_3 != slave then if $data2_3 != slave then
return -1 goto show2
endi endi
print ========== step2 print ========== step2
...@@ -72,26 +87,28 @@ sql create table d1.t6 (ts timestamp, i int) ...@@ -72,26 +87,28 @@ sql create table d1.t6 (ts timestamp, i int)
sql create table d1.t7 (ts timestamp, i int) sql create table d1.t7 (ts timestamp, i int)
sql create table d1.t8 (ts timestamp, i int) sql create table d1.t8 (ts timestamp, i int)
show3:
$x = $x + 1
sleep 1000
if $x == 30 then
return -1
endi
sql show dnodes sql show dnodes
print dnode1 $data2_1 print dnode1 $data2_1
print dnode2 $data2_2 print dnode2 $data2_2
print dnode3 $data2_3 print dnode3 $data2_3
if $data2_1 != 0 then if $data2_1 != 0 then
return -1 goto show3
endi endi
if $data2_2 != 1 then if $data2_2 != 1 then
return -1 goto show3
endi endi
if $data2_3 != 1 then if $data2_3 != 1 then
return -1 goto show3
endi endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT system sh/exec.sh -n dnode3 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode4 -s stop -x SIGINT
system sh/exec.sh -n dnode5 -s stop -x SIGINT
system sh/exec.sh -n dnode6 -s stop -x SIGINT
system sh/exec.sh -n dnode7 -s stop -x SIGINT
system sh/exec.sh -n dnode8 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册