diff --git a/packaging/deb/nginxd b/packaging/deb/nginxd new file mode 100644 index 0000000000000000000000000000000000000000..bdc50fb0464df504977249182857f06c2ec2ccbf --- /dev/null +++ b/packaging/deb/nginxd @@ -0,0 +1,46 @@ +#!/bin/bash +#Startup script for the nginx Web Server +# chkconfig: 2345 99 01 +# description: Nginx For TDengine Service. +# +# +### BEGIN INIT INFO +# Provides: nginx +# Required-Start: $local_fs $network $syslog +# Required-Stop: $local_fs $network $syslog +# Default-Start: 2 3 4 5 +# Default-Stop: 0 1 6 +# Short-Description: Starts nginx +# Description: Starts nginx. +### END INIT INFO + +nginx=/usr/local/nginxd/sbin/nginx +case $1 in + start) + echo -n "Starting Nginx" + $nginx + echo " done." + ;; + stop) + echo -n "Stopping Nginx" + $nginx -s stop + echo " done." + ;; + test) + $nginx -t + echo "Success." + ;; + reload) + echo -n "Reloading Nginx" + $nginx -s reload + echo " done." + ;; + restart) + $nginx -s reload + echo "reload done." + ;; + *) + echo "Usage: $0 {start|restart|reload|stop|test|show}" + ;; +esac + diff --git a/packaging/rpm/nginxd b/packaging/rpm/nginxd new file mode 100644 index 0000000000000000000000000000000000000000..fc6593992b297f7fcad03d04051a030dc1898d5e --- /dev/null +++ b/packaging/rpm/nginxd @@ -0,0 +1,44 @@ +#!/bin/bash +#Startup script for the nginx Web Server +# chkconfig: 2345 99 01 +# +# +### BEGIN INIT INFO +# Provides: nginx +# Provides: nginx +# Required-Start: $local_fs $network $syslog +# Required-Stop: $local_fs $network $syslog +# Short-Description: Starts nginx +# Description: Starts nginx. +### END INIT INFO + +nginx=/usr/local/nginxd/sbin/nginx +case $1 in + start) + echo -n "Starting Nginx" + $nginx + echo " done." + ;; + stop) + echo -n "Stopping Nginx" + $nginx -s stop + echo " done." + ;; + test) + $nginx -t + echo "Success." + ;; + reload) + echo -n "Reloading Nginx" + $nginx -s reload + echo " done." + ;; + restart) + $nginx -s reload + echo "reload done." + ;; + *) + echo "Usage: $0 {start|restart|reload|stop|test|show}" + ;; +esac + diff --git a/packaging/tools/install.sh b/packaging/tools/install.sh index 0d7a7dd9442fd635fce56792dc37c3276d1b955d..3930433b96166c8beb0bddca17aa73ab11c0ebc5 100755 --- a/packaging/tools/install.sh +++ b/packaging/tools/install.sh @@ -582,6 +582,26 @@ function clean_service_on_sysvinit() { ${csudo} rm -f ${service_config_dir}/taosd || : ${csudo} rm -f ${service_config_dir}/tarbitratord || : + if [ "$verMode" == "cluster" ]; then\ + if pidof nginxd ; then + ${csudo} service nginxd stop || : + fi + if ((${initd_mod}==1)); then + if [ -e ${service_config_dir}/nginxd ]; then + ${csudo} chkconfig --del nginxd || : + fi + elif ((${initd_mod}==2)); then + if [ -e ${service_config_dir}/nginxd ]; then + ${csudo} insserv -r nginxd || : + fi + elif ((${initd_mod}==3)); then + if [ -e ${service_config_dir}/nginxd ]; then + ${csudo} update-rc.d -f nginxd remove || : + fi + fi + ${csudo} rm -f ${service_config_dir}/nginxd || : + fi + if $(which init &> /dev/null); then ${csudo} init q || : fi @@ -598,11 +618,19 @@ function install_service_on_sysvinit() { ${csudo} cp ${script_dir}/init.d/taosd.deb ${service_config_dir}/taosd && ${csudo} chmod a+x ${service_config_dir}/taosd ${csudo} cp -f ${script_dir}/init.d/tarbitratord.deb ${install_main_dir}/init.d/tarbitratord ${csudo} cp ${script_dir}/init.d/tarbitratord.deb ${service_config_dir}/tarbitratord && ${csudo} chmod a+x ${service_config_dir}/tarbitratord + if [ "$verMode" == "cluster" ]; then + ${csudo} cp -f ${script_dir}/init.d/nginxd.deb ${install_main_dir}/init.d/nginxd + ${csudo} cp ${script_dir}/init.d/nginxd.deb ${service_config_dir}/nginxd && ${csudo} chmod a+x ${service_config_dir}/nginxd + fi elif ((${os_type}==2)); then ${csudo} cp -f ${script_dir}/init.d/taosd.rpm ${install_main_dir}/init.d/taosd ${csudo} cp ${script_dir}/init.d/taosd.rpm ${service_config_dir}/taosd && ${csudo} chmod a+x ${service_config_dir}/taosd ${csudo} cp -f ${script_dir}/init.d/tarbitratord.rpm ${install_main_dir}/init.d/tarbitratord ${csudo} cp ${script_dir}/init.d/tarbitratord.rpm ${service_config_dir}/tarbitratord && ${csudo} chmod a+x ${service_config_dir}/tarbitratord + if [ "$verMode" == "cluster" ]; then + ${csudo} cp -f ${script_dir}/init.d/nginxd.rpm ${install_main_dir}/init.d/nginxd + ${csudo} cp ${script_dir}/init.d/nginxd.rpm ${service_config_dir}/nginxd && ${csudo} chmod a+x ${service_config_dir}/nginxd + fi fi #restart_config_str="taos:2345:respawn:${service_config_dir}/taosd start" @@ -613,14 +641,26 @@ function install_service_on_sysvinit() { ${csudo} chkconfig --level 2345 taosd on || : ${csudo} chkconfig --add tarbitratord || : ${csudo} chkconfig --level 2345 tarbitratord on || : + if [ "$verMode" == "cluster" ]; then + ${csudo} chkconfig --add nginxd || : + ${csudo} chkconfig --level 0123456 nginxd on || : + ${csudo} service nginxd start + fi elif ((${initd_mod}==2)); then ${csudo} insserv taosd || : ${csudo} insserv -d taosd || : ${csudo} insserv tarbitratord || : ${csudo} insserv -d tarbitratord || : + if [ "$verMode" == "cluster" ]; then + ${csudo} insserv nginxd || : + ${csudo} insserv -d nginxd || : + fi elif ((${initd_mod}==3)); then ${csudo} update-rc.d taosd defaults || : ${csudo} update-rc.d tarbitratord defaults || : + if [ "$verMode" == "cluster" ]; then + ${csudo} update-rc.d nginxd defaults || : + fi fi } @@ -779,7 +819,7 @@ vercomp () { function is_version_compatible() { - curr_version=`ls ${script_dir}/driver/libtaos.so* |cut -d '.' -f 3-6` + curr_version=`ls ${script_dir}/driver/libtaos.so* | awk -F 'libtaos.so.' '{print $2}'` if [ -f ${script_dir}/driver/vercomp.txt ]; then min_compatible_version=`cat ${script_dir}/driver/vercomp.txt` diff --git a/packaging/tools/makepkg.sh b/packaging/tools/makepkg.sh index 39918325992ba140db6db656c6946ec40dd84435..1b11973e3c232f3b00cd0f1d1db8c903e8046da6 100755 --- a/packaging/tools/makepkg.sh +++ b/packaging/tools/makepkg.sh @@ -63,6 +63,8 @@ init_file_deb=${script_dir}/../deb/taosd init_file_rpm=${script_dir}/../rpm/taosd init_file_tarbitrator_deb=${script_dir}/../deb/tarbitratord init_file_tarbitrator_rpm=${script_dir}/../rpm/tarbitratord +init_file_nginx_deb=${script_dir}/../deb/nginxd +init_file_nginx_rpm=${script_dir}/../rpm/nginxd # make directories. mkdir -p ${install_dir} @@ -73,6 +75,8 @@ mkdir -p ${install_dir}/init.d && cp ${init_file_deb} ${install_dir}/init.d/taos mkdir -p ${install_dir}/init.d && cp ${init_file_rpm} ${install_dir}/init.d/taosd.rpm mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_deb} ${install_dir}/init.d/tarbitratord.deb || : mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_rpm} ${install_dir}/init.d/tarbitratord.rpm || : +mkdir -p ${install_dir}/init.d && cp ${init_file_nginx_deb} ${install_dir}/init.d/nginxd.deb || : +mkdir -p ${install_dir}/init.d && cp ${init_file_nginx_rpm} ${install_dir}/init.d/nginxd.rpm || : if [ -f ${build_dir}/bin/jemalloc-config ]; then mkdir -p ${install_dir}/jemalloc/{bin,lib,lib/pkgconfig,include/jemalloc,share/doc/jemalloc,share/man/man3} diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 4d4667e135d04b28a3b94952f1c165b5dd16a784..5fbe756b3bd63b0b22bda703ff5b02d4daf82052 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -3142,15 +3142,19 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { SSqlCmd* pCmd2 = &pSql->rootObj->cmd; pCmd2->pTableMetaMap = tscCleanupTableMetaMap(pCmd2->pTableMetaMap); pCmd2->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - + pSql->rootObj->retryReason = pSql->retryReason; + SSqlObj *tmpSql = pSql->rootObj; + tscFreeSubobj(pSql->rootObj); + tfree(tmpSql->pSubs); + SArray* pNameList = taosArrayInit(1, POINTER_BYTES); SArray* vgroupList = taosArrayInit(1, POINTER_BYTES); char* n = strdup(name); taosArrayPush(pNameList, &n); - code = getMultiTableMetaFromMnode(pSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true); + code = getMultiTableMetaFromMnode(tmpSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true); taosArrayDestroyEx(pNameList, freeElem); taosArrayDestroyEx(vgroupList, freeElem); diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 9775a3898fb759873ffe48673062dd8402873041..ffa89b8c39bea339e7916cc027232e88bfbaa44b 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -223,6 +223,8 @@ extern uint32_t curRange; extern char Compressor[]; #endif +// long query +extern int8_t tsDeadLockKillQuery; typedef struct { char dir[TSDB_FILENAME_LEN]; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index cd8d6e21d1ac52579a6e23bc953d5efdd5696a1f..50d3fcd1a914bcb6af2629334a3f94daf7d26101 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -271,6 +271,9 @@ uint32_t curRange = 100; // range char Compressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR #endif +// long query death-lock +int8_t tsDeadLockKillQuery = 1; + int32_t (*monStartSystemFp)() = NULL; void (*monStopSystemFp)() = NULL; void (*monExecuteSQLFp)(char *sql) = NULL; @@ -1606,6 +1609,17 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + // enable kill long query + cfg.option = "deadLockKillQuery"; + cfg.ptr = &tsDeadLockKillQuery; + cfg.valType = TAOS_CFG_VTYPE_INT8; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; + cfg.minValue = 0; + cfg.maxValue = 1; + cfg.ptrLength = 1; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + #ifdef TD_TSZ // lossy compress cfg.option = "lossyColumns"; diff --git a/src/inc/query.h b/src/inc/query.h index fb9cbff8584892b4a6bc6e4a6ce046a7500aef39..0872e3dbaa517ded77dd758b30e69f273c13a580 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -76,6 +76,11 @@ void* qGetResultRetrieveMsg(qinfo_t qinfo); */ int32_t qKillQuery(qinfo_t qinfo); +//kill by qid +int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount); + +bool qSolveCommitNoBlock(void* pRepo, void* pMgmt); + int32_t qQueryCompleted(qinfo_t qinfo); /** diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 000703464cfb8c687e473b7559e1048c42f8a6de..0f31b99ac070da2e3c1de0ec8638a7b398bc63a0 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -35,6 +35,7 @@ int32_t* taosGetErrno(); #define terrno (*taosGetErrno()) #define TSDB_CODE_SUCCESS 0 +#define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error // rpc #define TSDB_CODE_RPC_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0001) //"Action in progress") diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 7abe3e99c720af1682fc103beec9a5d4caeb09eb..089e30ac3728761c68fe155f960c8650a32c2f7a 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -39,6 +39,7 @@ extern "C" { #define TSDB_STATUS_COMMIT_START 1 #define TSDB_STATUS_COMMIT_OVER 2 +#define TSDB_STATUS_COMMIT_NOBLOCK 3 //commit no block, need to be solved // TSDB STATE DEFINITION #define TSDB_STATE_OK 0x0 @@ -413,6 +414,11 @@ int tsdbSyncRecv(void *pRepo, SOCKET socketFd); // For TSDB Compact int tsdbCompact(STsdbRepo *pRepo); +// For TSDB Health Monitor + +// no problem return true +bool tsdbNoProblem(STsdbRepo* pRepo); + #ifdef __cplusplus } #endif diff --git a/src/kit/shell/src/shellWindows.c b/src/kit/shell/src/shellWindows.c index d5ddc9d32a8dcd741865823b534c39e7248ae199..c426653ab529405509a4fe7d2c1003a1bd5679b3 100644 --- a/src/kit/shell/src/shellWindows.c +++ b/src/kit/shell/src/shellWindows.c @@ -17,7 +17,7 @@ #include "taos.h" #include "shellCommand.h" -#define SHELL_INPUT_MAX_COMMAND_SIZE 500000 +#define SHELL_INPUT_MAX_COMMAND_SIZE 10000 extern char configDir[]; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 4d38cf149d61e877485448fbd482d303afa69832..eb21a8f5c304d922489f90b319c515e4b6144d7c 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -8416,6 +8416,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S } pQInfo->qId = qId; + pQInfo->startExecTs = 0; pQInfo->runtimeEnv.pUdfInfo = pUdfInfo; diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index 28c5df32516ae83fbccecd40208b2735e484a6c4..0ba29ab9047d1b526ed29fd7ec3eb87170f2ae17 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -35,7 +35,7 @@ typedef struct SQueryMgmt { bool closed; } SQueryMgmt; -static void queryMgmtKillQueryFn(void* handle) { +static void queryMgmtKillQueryFn(void* handle, void* param1) { void** fp = (void**)handle; qKillQuery(*fp); } @@ -452,7 +452,7 @@ void qQueryMgmtNotifyClosed(void* pQMgmt) { pQueryMgmt->closed = true; pthread_mutex_unlock(&pQueryMgmt->lock); - taosCacheRefresh(pQueryMgmt->qinfoPool, queryMgmtKillQueryFn); + taosCacheRefresh(pQueryMgmt->qinfoPool, queryMgmtKillQueryFn, NULL); } void qQueryMgmtReOpen(void *pQMgmt) { @@ -547,3 +547,148 @@ void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle) { taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, freeHandle); return 0; } + +//kill by qid +int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount) { + int32_t err = TSDB_CODE_SUCCESS; + void** handle = qAcquireQInfo(pMgmt, qId); + if(handle == NULL) return terrno; + + SQInfo* pQInfo = (SQInfo*)(*handle); + if (pQInfo == NULL || !isValidQInfo(pQInfo)) { + return TSDB_CODE_QRY_INVALID_QHANDLE; + } + qWarn("QId:0x%"PRIx64" be killed(no memory commit).", pQInfo->qId); + setQueryKilled(pQInfo); + + // wait query stop + int32_t loop = 0; + while (pQInfo->owner != 0) { + taosMsleep(waitMs); + if(loop++ > waitCount){ + err = TSDB_CODE_FAILED; + break; + } + } + + qReleaseQInfo(pMgmt, (void **)&handle, true); + return err; +} + +// local struct +typedef struct { + int64_t qId; + int64_t startExecTs; +} SLongQuery; + +// callbark for sort compare +static int compareLongQuery(const void* p1, const void* p2) { + // sort desc + SLongQuery* plq1 = *(SLongQuery**)p1; + SLongQuery* plq2 = *(SLongQuery**)p2; + if(plq1->startExecTs == plq2->startExecTs) { + return 0; + } else if(plq1->startExecTs > plq2->startExecTs) { + return 1; + } else { + return -1; + } +} + +// callback for taosCacheRefresh +static void cbFoundItem(void* handle, void* param1) { + SQInfo * qInfo = *(SQInfo**) handle; + if(qInfo == NULL) return ; + SArray* qids = (SArray*) param1; + if(qids == NULL) return ; + + bool usedMem = true; + bool usedIMem = true; + SMemTable* mem = qInfo->query.memRef.snapshot.omem; + SMemTable* imem = qInfo->query.memRef.snapshot.imem; + if(mem == NULL || T_REF_VAL_GET(mem) == 0) + usedMem = false; + if(imem == NULL || T_REF_VAL_GET(mem) == 0) + usedIMem = false ; + + if(!usedMem && !usedIMem) + return ; + + // push to qids + SLongQuery* plq = (SLongQuery*)malloc(sizeof(SLongQuery)); + plq->qId = qInfo->qId; + plq->startExecTs = qInfo->startExecTs; + taosArrayPush(qids, &plq); +} + +// longquery +void* qObtainLongQuery(void* param){ + SQueryMgmt* qMgmt = (SQueryMgmt*)param; + if(qMgmt == NULL || qMgmt->qinfoPool == NULL) + return NULL; + SArray* qids = taosArrayInit(4, sizeof(int64_t*)); + if(qids == NULL) return NULL; + // Get each item + taosCacheRefresh(qMgmt->qinfoPool, cbFoundItem, qids); + + size_t cnt = taosArrayGetSize(qids); + if(cnt == 0) { + taosArrayDestroy(qids); + return NULL; + } + if(cnt > 1) + taosArraySort(qids, compareLongQuery); + + return qids; +} + +//solve tsdb no block to commit +bool qFixedNoBlock(void* pRepo, void* pMgmt, int32_t longQueryMs) { + SQueryMgmt *pQueryMgmt = pMgmt; + bool fixed = false; + + // qid top list + SArray *qids = (SArray*)qObtainLongQuery(pQueryMgmt); + if(qids == NULL) return false; + + // kill Query + int64_t now = taosGetTimestampMs(); + size_t cnt = taosArrayGetSize(qids); + size_t i; + SLongQuery* plq; + for(i=0; i < cnt; i++) { + plq = (SLongQuery* )taosArrayGetP(qids, i); + if(plq->startExecTs > now) continue; + if(now - plq->startExecTs >= longQueryMs) { + qKillQueryByQId(pMgmt, plq->qId, 500, 10); // wait 50*100 ms + if(tsdbNoProblem(pRepo)) { + fixed = true; + qWarn("QId:0x%"PRIx64" fixed problem after kill this query.", plq->qId); + break; + } + } + } + + // free qids + for(i=0; i < cnt; i++) { + free(taosArrayGetP(qids, i)); + } + taosArrayDestroy(qids); + return fixed; +} + +//solve tsdb no block to commit +bool qSolveCommitNoBlock(void* pRepo, void* pMgmt) { + qWarn("pRepo=%p start solve problem.", pRepo); + if(qFixedNoBlock(pRepo, pMgmt, 10*60*1000)) { + return true; + } + if(qFixedNoBlock(pRepo, pMgmt, 2*60*1000)){ + return true; + } + if(qFixedNoBlock(pRepo, pMgmt, 30*1000)){ + return true; + } + qWarn("pRepo=%p solve problem failed.", pRepo); + return false; +} \ No newline at end of file diff --git a/src/tsdb/inc/tsdbBuffer.h b/src/tsdb/inc/tsdbBuffer.h index ec6b057aef142fb938993b3a27717c5e64937258..4b650d3993a54f6a98caf00a3605feb37e972ebd 100644 --- a/src/tsdb/inc/tsdbBuffer.h +++ b/src/tsdb/inc/tsdbBuffer.h @@ -29,6 +29,7 @@ typedef struct { int tBufBlocks; int nBufBlocks; int nRecycleBlocks; + int nElasticBlocks; int64_t index; SList* bufBlockList; } STsdbBufPool; @@ -41,6 +42,10 @@ int tsdbOpenBufPool(STsdbRepo* pRepo); void tsdbCloseBufPool(STsdbRepo* pRepo); SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks); -void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode); +void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic); + +// health cite +STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize); +void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock); #endif /* _TD_TSDB_BUFFER_H_ */ diff --git a/src/tsdb/inc/tsdbHealth.h b/src/tsdb/inc/tsdbHealth.h new file mode 100644 index 0000000000000000000000000000000000000000..324f4312e05fc0ca0200c319728bf692bf476bf6 --- /dev/null +++ b/src/tsdb/inc/tsdbHealth.h @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TSDB_HEALTH_H_ +#define _TD_TSDB_HEALTH_H_ + +bool tsdbUrgeQueryFree(STsdbRepo* pRepo); +int32_t tsdbInsertNewBlock(STsdbRepo* pRepo); + +bool tsdbIdleMemEnough(); +bool tsdbAllowNewBlock(STsdbRepo* pRepo); + +#endif /* _TD_TSDB_BUFFER_H_ */ diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 3b0937861d6d256f28818157b4517efecd2f11c7..a7f08e61ed1ac576c1ca70c0da20e14f4b3a306f 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -97,6 +97,7 @@ struct STsdbRepo { SMergeBuf mergeBuf; //used when update=2 int8_t compactState; // compact state: inCompact/noCompact/waitingCompact? + pthread_t* pthread; }; #define REPO_ID(r) (r)->config.tsdbId diff --git a/src/tsdb/src/tsdbBuffer.c b/src/tsdb/src/tsdbBuffer.c index e675bf6f9de04021112d43a1db70cf56cf430f08..1faec90bf6fcb0fa48ff0b539e2c6da7cbe6df91 100644 --- a/src/tsdb/src/tsdbBuffer.c +++ b/src/tsdb/src/tsdbBuffer.c @@ -14,11 +14,10 @@ */ #include "tsdbint.h" +#include "tsdbHealth.h" #define POOL_IS_EMPTY(b) (listNEles((b)->bufBlockList) == 0) -static STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize); -static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock); // ---------------- INTERNAL FUNCTIONS ---------------- STsdbBufPool *tsdbNewBufPool() { @@ -69,6 +68,7 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) { pPool->bufBlockSize = pCfg->cacheBlockSize * 1024 * 1024; // MB pPool->tBufBlocks = pCfg->totalBlocks; pPool->nBufBlocks = 0; + pPool->nElasticBlocks = 0; pPool->index = 0; pPool->nRecycleBlocks = 0; @@ -120,6 +120,18 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { STsdbBufPool *pBufPool = pRepo->pPool; while (POOL_IS_EMPTY(pBufPool)) { + if(tsDeadLockKillQuery) { + // supply new Block + if(tsdbInsertNewBlock(pRepo) > 0) { + tsdbWarn("vgId:%d add new elastic block . elasticBlocks=%d cur free Blocks=%d", REPO_ID(pRepo), pBufPool->nElasticBlocks, pBufPool->bufBlockList->numOfEles); + break; + } else { + // no newBlock, kill query free + if(!tsdbUrgeQueryFree(pRepo)) + tsdbWarn("vgId:%d Urge query free thread start failed.", REPO_ID(pRepo)); + } + } + pRepo->repoLocked = false; pthread_cond_wait(&(pBufPool->poolNotEmpty), &(pRepo->mutex)); pRepo->repoLocked = true; @@ -139,11 +151,11 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { } // ---------------- LOCAL FUNCTIONS ---------------- -static STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) { +STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) { STsdbBufBlock *pBufBlock = (STsdbBufBlock *)malloc(sizeof(*pBufBlock) + bufBlockSize); if (pBufBlock == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; + return NULL; } pBufBlock->blockId = 0; @@ -151,13 +163,9 @@ static STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) { pBufBlock->remain = bufBlockSize; return pBufBlock; - -_err: - tsdbFreeBufBlock(pBufBlock); - return NULL; } -static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); } +void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); } int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) { if (oldTotalBlocks == pRepo->config.totalBlocks) { @@ -193,10 +201,15 @@ err: return err; } -void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode) { +void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic) { STsdbBufBlock *pBufBlock = NULL; tdListNodeGetData(pPool->bufBlockList, pNode, (void *)(&pBufBlock)); tsdbFreeBufBlock(pBufBlock); free(pNode); - pPool->nBufBlocks--; + if(bELastic) { + pPool->nElasticBlocks--; + tsdbWarn("pPool=%p elastic block reduce one . nElasticBlocks=%d cur free Blocks=%d", pPool, pPool->nElasticBlocks, pPool->bufBlockList->numOfEles); + } + else + pPool->nBufBlocks--; } diff --git a/src/tsdb/src/tsdbHealth.c b/src/tsdb/src/tsdbHealth.c new file mode 100644 index 0000000000000000000000000000000000000000..8198c480334912b1ce373ceca7b82409f5a644f2 --- /dev/null +++ b/src/tsdb/src/tsdbHealth.c @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "taosmsg.h" +#include "tarray.h" +#include "query.h" +#include "tglobal.h" +#include "tlist.h" +#include "tsdbint.h" +#include "tsdbBuffer.h" +#include "tsdbLog.h" +#include "tsdbHealth.h" +#include "ttimer.h" +#include "tthread.h" + + +// return malloc new block count +int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) { + STsdbBufPool *pPool = pRepo->pPool; + int32_t cnt = 0; + + if(tsdbAllowNewBlock(pRepo)) { + STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize); + if (pBufBlock) { + if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) { + // append error + tsdbFreeBufBlock(pBufBlock); + } else { + pPool->nElasticBlocks ++; + cnt ++ ; + } + } + } + return cnt; +} + +// switch anther thread to run +void* cbKillQueryFree(void* param) { + STsdbRepo* pRepo = (STsdbRepo*)param; + // vnode + if(pRepo->appH.notifyStatus) { + pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_NOBLOCK, TSDB_CODE_SUCCESS); + } + + // free + if(pRepo->pthread){ + void* p = pRepo->pthread; + pRepo->pthread = NULL; + free(p); + } + + return NULL; +} + +// return true do free , false do nothing +bool tsdbUrgeQueryFree(STsdbRepo * pRepo) { + // check previous running + if(pRepo->pthread && taosThreadRunning(pRepo->pthread)) { + tsdbWarn("vgId:%d pre urge thread is runing. nBlocks=%d nElasticBlocks=%d", REPO_ID(pRepo), pRepo->pPool->nBufBlocks, pRepo->pPool->nElasticBlocks); + return false; + } + // create new + pRepo->pthread = taosCreateThread(cbKillQueryFree, pRepo); + if(pRepo->pthread == NULL) { + tsdbError("vgId:%d create urge thread error.", REPO_ID(pRepo)); + return false; + } + return true; +} + +bool tsdbAllowNewBlock(STsdbRepo* pRepo) { + int32_t nMaxElastic = pRepo->config.totalBlocks/3; + STsdbBufPool* pPool = pRepo->pPool; + if(pPool->nElasticBlocks >= nMaxElastic) { + tsdbWarn("vgId:%d tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", REPO_ID(pRepo), pPool->nElasticBlocks, nMaxElastic); + return false; + } + return true; +} + +bool tsdbNoProblem(STsdbRepo* pRepo) { + if(listNEles(pRepo->pPool->bufBlockList) == 0) + return false; + return true; +} \ No newline at end of file diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index b09f6ed4b5a107502f1ee87ba065e05a83d80f1c..39a0543b23d4333dca6f941324976d6bb156907a 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -16,6 +16,8 @@ // no test file errors here #include "taosdef.h" #include "tsdbint.h" +#include "ttimer.h" +#include "tthread.h" #define IS_VALID_PRECISION(precision) \ (((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO)) @@ -126,6 +128,10 @@ int tsdbCloseRepo(STsdbRepo *repo, int toCommit) { terrno = TSDB_CODE_SUCCESS; tsdbStopStream(pRepo); + if(pRepo->pthread){ + taosDestoryThread(pRepo->pthread); + pRepo->pthread = NULL; + } if (toCommit) { tsdbSyncCommit(repo); @@ -547,6 +553,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { pRepo->appH = *pAppH; } pRepo->repoLocked = false; + pRepo->pthread = NULL; int code = pthread_mutex_init(&(pRepo->mutex), NULL); if (code != 0) { diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 41a83e4643f77f1f79a539140eae886c213b0164..be074bff83ec1c3fb5708fad6aac0afa33684a0e 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -99,17 +99,22 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { STsdbBufPool *pBufPool = pRepo->pPool; SListNode *pNode = NULL; - bool recycleBlocks = pBufPool->nRecycleBlocks > 0; + bool addNew = false; if (tsdbLockRepo(pRepo) < 0) return -1; while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) { if (pBufPool->nRecycleBlocks > 0) { - tsdbRecycleBufferBlock(pBufPool, pNode); + tsdbRecycleBufferBlock(pBufPool, pNode, false); pBufPool->nRecycleBlocks -= 1; } else { - tdListAppendNode(pBufPool->bufBlockList, pNode); + if(pBufPool->nElasticBlocks > 0 && listNEles(pBufPool->bufBlockList) > 2) { + tsdbRecycleBufferBlock(pBufPool, pNode, true); + } else { + tdListAppendNode(pBufPool->bufBlockList, pNode); + addNew = true; + } } } - if (!recycleBlocks) { + if (addNew) { int code = pthread_cond_signal(&pBufPool->poolNotEmpty); if (code != 0) { if (tsdbUnlockRepo(pRepo) < 0) return -1; diff --git a/src/util/inc/tcache.h b/src/util/inc/tcache.h index e41b544d00e55f7eece904c5957ef9c06063e6c3..40069d7d273caa14ce3b80467b25d68ea476fb75 100644 --- a/src/util/inc/tcache.h +++ b/src/util/inc/tcache.h @@ -33,6 +33,7 @@ extern "C" { #endif typedef void (*__cache_free_fn_t)(void*); +typedef void (*__cache_trav_fn_t)(void*, void*); typedef struct SCacheStatis { int64_t missCount; @@ -176,7 +177,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj); * @param fp * @return */ -void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp); +void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void* param1); /** * stop background refresh worker thread diff --git a/src/util/inc/tconfig.h b/src/util/inc/tconfig.h index 2a2ac7dacf378d90bb925e97c32887c487fc3313..e2165ec1a32c3b47a47a4b22243d1ac178d58876 100644 --- a/src/util/inc/tconfig.h +++ b/src/util/inc/tconfig.h @@ -20,7 +20,7 @@ extern "C" { #endif -#define TSDB_CFG_MAX_NUM 121 +#define TSDB_CFG_MAX_NUM 122 #define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_VALUE_LEN 41 diff --git a/src/util/inc/tthread.h b/src/util/inc/tthread.h new file mode 100644 index 0000000000000000000000000000000000000000..7443ad706dcbef529d857fe823cddd0cc1efbdd3 --- /dev/null +++ b/src/util/inc/tthread.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TTHREAD_H +#define TDENGINE_TTHREAD_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "os.h" +#include "taosdef.h" + +// create new thread +pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param); +// destory thread +bool taosDestoryThread(pthread_t* pthread); +// thread running return true +bool taosThreadRunning(pthread_t* pthread); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_TTHREAD_H diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 1bb2c7070dafa875afcf836174f337147a93ae7c..f49dd16c3943040bcc9b58000a87845ea1c87b13 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -505,7 +505,8 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { typedef struct SHashTravSupp { SCacheObj* pCacheObj; int64_t time; - __cache_free_fn_t fp; + __cache_trav_fn_t fp; + void* param1; } SHashTravSupp; static bool travHashTableEmptyFn(void* param, void* data) { @@ -667,17 +668,17 @@ bool travHashTableFn(void* param, void* data) { } if (ps->fp) { - (ps->fp)(pNode->data); + (ps->fp)(pNode->data, ps->param1); } // do not remove element in hash table return true; } -static void doCacheRefresh(SCacheObj* pCacheObj, int64_t utl_time, __cache_free_fn_t fp) { +static void doCacheRefresh(SCacheObj* pCacheObj, int64_t utl_time, __cache_trav_fn_t fp, void* param1) { assert(pCacheObj != NULL); - SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = utl_time}; + SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = utl_time, .param1 = param1}; taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup); } @@ -748,7 +749,7 @@ void* taosCacheTimedRefresh(void *handle) { // refresh data in hash table if (elemInHash > 0) { int64_t now = taosGetTimestampMs(); - doCacheRefresh(pCacheObj, now, NULL); + doCacheRefresh(pCacheObj, now, NULL, NULL); } taosTrashcanEmpty(pCacheObj, false); @@ -766,13 +767,13 @@ void* taosCacheTimedRefresh(void *handle) { return NULL; } -void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp) { +void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void* param1) { if (pCacheObj == NULL) { return; } int64_t now = taosGetTimestampMs(); - doCacheRefresh(pCacheObj, now, fp); + doCacheRefresh(pCacheObj, now, fp, param1); } void taosStopCacheRefreshWorker(void) { diff --git a/src/util/src/tthread.c b/src/util/src/tthread.c new file mode 100644 index 0000000000000000000000000000000000000000..043b2de2f241297d209041294428dde2c55e974e --- /dev/null +++ b/src/util/src/tthread.c @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "tthread.h" +#include "tglobal.h" +#include "taosdef.h" +#include "tutil.h" +#include "tulog.h" +#include "taoserror.h" + +// create new thread +pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param) { + pthread_t* pthread = (pthread_t*)malloc(sizeof(pthread_t)); + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + int32_t ret = pthread_create(pthread, &thattr, __start_routine, param); + pthread_attr_destroy(&thattr); + + if (ret != 0) { + free(pthread); + return NULL; + } + return pthread; +} + +// destory thread +bool taosDestoryThread(pthread_t* pthread) { + if(pthread == NULL) return false; + if(taosThreadRunning(pthread)) { + pthread_cancel(*pthread); + pthread_join(*pthread, NULL); + } + + free(pthread); + return true; +} + +// thread running return true +bool taosThreadRunning(pthread_t* pthread) { + if(pthread == NULL) return false; + int ret = pthread_kill(*pthread, 0); + if(ret == ESRCH) + return false; + if(ret == EINVAL) + return false; + // alive + return true; +} diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index fd61cbef77b03bae7d76be9b74e37a059c64078e..60c60cf531c815023fb18eafcc79d7c41b5cae4f 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -560,5 +560,10 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) { return vnodeSaveVersion(pVnode); } + // timer thread callback + if(status == TSDB_STATUS_COMMIT_NOBLOCK) { + qSolveCommitNoBlock(pVnode->tsdb, pVnode->qMgmt); + } + return 0; } diff --git a/tests/script/unique/dnode/alternativeRole.sim b/tests/script/unique/dnode/alternativeRole.sim index 955b757f06df22c884565d0fac350456e79cb73a..7e647925d1d3d66d21f279ace852e3fc12496510 100644 --- a/tests/script/unique/dnode/alternativeRole.sim +++ b/tests/script/unique/dnode/alternativeRole.sim @@ -32,33 +32,48 @@ sql create dnode $hostname3 system sh/exec.sh -n dnode3 -s start sleep 3000 +$x = 0 +show1: + $x = $x + 1 + sleep 1000 + if $x == 30 then + return -1 + endi + sql show dnodes print dnode1 $data5_1 -print dnode1 $data5_2 -print dnode1 $data5_3 +print dnode2 $data5_2 +print dnode3 $data5_3 if $data5_1 != mnode then - return -1 + goto show1 endi if $data5_2 != vnode then - return -1 + goto show1 endi if $data5_3 != any then - return -1 + goto show1 endi +show2: + $x = $x + 1 + sleep 1000 + if $x == 30 then + return -1 + endi + sql show mnodes print dnode1 ==> $data2_1 print dnode2 ==> $data2_2 print dnode3 ==> $data2_3 if $data2_1 != master then - return -1 + goto show2 endi if $data2_2 != null then - return -1 + goto show2 endi if $data2_3 != slave then - return -1 + goto show2 endi print ========== step2 @@ -72,26 +87,28 @@ sql create table d1.t6 (ts timestamp, i int) sql create table d1.t7 (ts timestamp, i int) sql create table d1.t8 (ts timestamp, i int) +show3: + $x = $x + 1 + sleep 1000 + if $x == 30 then + return -1 + endi + sql show dnodes print dnode1 $data2_1 print dnode2 $data2_2 print dnode3 $data2_3 if $data2_1 != 0 then - return -1 + goto show3 endi if $data2_2 != 1 then - return -1 + goto show3 endi if $data2_3 != 1 then - return -1 + goto show3 endi system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT -system sh/exec.sh -n dnode3 -s stop -x SIGINT -system sh/exec.sh -n dnode4 -s stop -x SIGINT -system sh/exec.sh -n dnode5 -s stop -x SIGINT -system sh/exec.sh -n dnode6 -s stop -x SIGINT -system sh/exec.sh -n dnode7 -s stop -x SIGINT -system sh/exec.sh -n dnode8 -s stop -x SIGINT \ No newline at end of file +system sh/exec.sh -n dnode3 -s stop -x SIGINT \ No newline at end of file