diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 66131451e8574f949d434d04b7f31cfc4a28d35d..b840053b08ac0e7e03872c902cf6c006507f6951 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -838,6 +838,7 @@ typedef struct { int64_t dbId; int32_t vgVersion; int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT + int64_t stateTs; // ms } SUseDbReq; int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); @@ -853,6 +854,7 @@ typedef struct { int8_t hashMethod; SArray* pVgroupInfos; // Array of SVgroupInfo int32_t errCode; + int64_t stateTs; // ms } SUseDbRsp; int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, const SUseDbRsp* pRsp); diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 3902224f0bd316fab20fc6d5cb860d1b9a4dd160..7717d3bccc17910667fa6f42c6c5e1a556541601 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -124,6 +124,7 @@ typedef struct SDbVgVersion { int64_t dbId; int32_t vgVersion; int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT + int64_t stateTs; } SDbVgVersion; typedef struct STbSVersion { diff --git a/packaging/MPtestJenkinsfile b/packaging/MPtestJenkinsfile index 5b793459164baec2791107842bddb3f0bb90b2df..5dc8024f29ddc0f09727d167aceb3f2516886488 100644 --- a/packaging/MPtestJenkinsfile +++ b/packaging/MPtestJenkinsfile @@ -13,7 +13,7 @@ def sync_source(branch_name) { git branch git pull || git pull git log | head -n 20 - git submodule update --init --recursive + git clean -fxd ''' return 1 } @@ -37,19 +37,24 @@ def build_run() { pipeline { agent none parameters { + choice( + name: 'sourcePath', + choices: ['nas','web'], + description: 'choice which way to download the installation pacakge;web is Office Web and nas means taos nas server ' + ) string ( name:'version', - defaultValue:'3.0.0.1', + defaultValue:'3.0.1.6', description: 'release version number,eg: 3.0.0.1 or 3.0.0.' ) string ( name:'baseVersion', - defaultValue:'3.0.0.1', + defaultValue:'3.0.1.6', description: 'This number of baseVerison is generally not modified.Now it is 3.0.0.1' ) string ( name:'toolsVersion', - defaultValue:'2.1.2', + defaultValue:'2.2.7', description: 'This number of baseVerison is generally not modified.Now it is 3.0.0.1' ) string ( @@ -62,7 +67,7 @@ pipeline { WORK_DIR = '/var/lib/jenkins/workspace' TDINTERNAL_ROOT_DIR = '/var/lib/jenkins/workspace/TDinternal' TDENGINE_ROOT_DIR = '/var/lib/jenkins/workspace/TDinternal/community' - BRANCH_NAME = 'test/chr/TD-14699' + BRANCH_NAME = '3.0' TD_SERVER_TAR = "TDengine-server-${version}-Linux-x64.tar.gz" BASE_TD_SERVER_TAR = "TDengine-server-${baseVersion}-Linux-x64.tar.gz" @@ -104,17 +109,17 @@ pipeline { sync_source("${BRANCH_NAME}") sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${sourcePath} python3 checkPackageRuning.py ''' sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server ${sourcePath} python3 checkPackageRuning.py ''' sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_DEB} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_DEB} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${sourcePath} python3 checkPackageRuning.py ''' } @@ -127,17 +132,17 @@ pipeline { sync_source("${BRANCH_NAME}") sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${sourcePath} python3 checkPackageRuning.py ''' sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server ${sourcePath} python3 checkPackageRuning.py ''' sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_DEB} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_DEB} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${sourcePath} python3 checkPackageRuning.py dpkg -r tdengine ''' @@ -152,17 +157,17 @@ pipeline { sync_source("${BRANCH_NAME}") sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${sourcePath} python3 checkPackageRuning.py ''' sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server ${sourcePath} python3 checkPackageRuning.py ''' sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_RPM} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_RPM} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${sourcePath} python3 checkPackageRuning.py ''' } @@ -175,17 +180,17 @@ pipeline { sync_source("${BRANCH_NAME}") sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${sourcePath} python3 checkPackageRuning.py ''' sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server ${sourcePath} python3 checkPackageRuning.py ''' sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_RPM} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_RPM} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${sourcePath} python3 checkPackageRuning.py sudo rpm -e tdengine ''' @@ -199,7 +204,7 @@ pipeline { sync_source("${BRANCH_NAME}") sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_ARM_TAR} ${version} ${BASE_TD_SERVER_ARM_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_ARM_TAR} ${version} ${BASE_TD_SERVER_ARM_TAR} ${baseVersion} server ${sourcePath} python3 checkPackageRuning.py ''' } @@ -215,7 +220,7 @@ pipeline { timeout(time: 30, unit: 'MINUTES'){ sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_CLIENT_TAR} ${version} ${BASE_TD_CLIENT_TAR} ${baseVersion} client + bash testpackage.sh ${TD_CLIENT_TAR} ${version} ${BASE_TD_CLIENT_TAR} ${baseVersion} client ${sourcePath} python3 checkPackageRuning.py 192.168.0.21 ''' } @@ -227,7 +232,7 @@ pipeline { timeout(time: 30, unit: 'MINUTES'){ sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_CLIENT_LITE_TAR} ${version} ${BASE_TD_CLIENT_LITE_TAR} ${baseVersion} client + bash testpackage.sh ${TD_CLIENT_LITE_TAR} ${version} ${BASE_TD_CLIENT_LITE_TAR} ${baseVersion} client ${sourcePath} python3 checkPackageRuning.py 192.168.0.24 ''' } @@ -241,7 +246,7 @@ pipeline { timeout(time: 30, unit: 'MINUTES'){ sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_CLIENT_ARM_TAR} ${version} ${BASE_TD_CLIENT_ARM_TAR} ${baseVersion} client + bash testpackage.sh ${TD_CLIENT_ARM_TAR} ${version} ${BASE_TD_CLIENT_ARM_TAR} ${baseVersion} client ${sourcePath} python3 checkPackageRuning.py 192.168.0.21 ''' } diff --git a/packaging/testpackage.sh b/packaging/testpackage.sh index 5b9a12179deb056396e3f08d544a4478d5535005..b0e33835ddf5316d4d8374baaa657176642af181 100755 --- a/packaging/testpackage.sh +++ b/packaging/testpackage.sh @@ -6,6 +6,8 @@ version=$2 originPackageName=$3 originversion=$4 testFile=$5 +# sourcePath:web/nas +sourcePath=$6 subFile="taos.tar.gz" # Color setting @@ -71,16 +73,23 @@ fi function wgetFile { file=$1 +versionPath=$2 +sourceP=$3 +nasServerIP="192.168.1.131" +packagePath="/nas/TDengine/v${versionPath}/community" +if [ -f ${file} ];then + echoColor YD "${file} already exists ,it will delete it and download it again " + rm -rf ${file} +fi -if [ ! -f ${file} ];then - echoColor BD "wget https://www.taosdata.com/assets-download/3.0/${file}" +if [ ${sourceP} = 'web' ];then + echoColor BD "====download====:wget https://www.taosdata.com/assets-download/3.0/${file}" wget https://www.taosdata.com/assets-download/3.0/${file} -else - echoColor YD "${file} already exists and use new file " - rm -rf ${file} - echoColor BD "wget https://www.taosdata.com/assets-download/3.0/${file}" - wget https://www.taosdata.com/assets-download/3.0/${file} +elif [ ${sourceP} = 'nas' ];then + echoColor BD "====download====:scp root@${nasServerIP}:${packagePath}/${file} ." + scp root@${nasServerIP}:${packagePath}/${file} . fi + } function newPath { @@ -142,8 +151,9 @@ if [ -d ${installPath}/${tdPath} ] ;then fi echoColor G "===== download installPackage =====" -cd ${installPath} && wgetFile ${packgeName} -cd ${oriInstallPath} && wgetFile ${originPackageName} +cd ${installPath} && wgetFile ${packgeName} ${version} ${sourcePath} +cd ${oriInstallPath} && wgetFile ${originPackageName} ${originversion} ${sourcePath} + cd ${installPath} cp -r ${scriptDir}/debRpmAutoInstall.sh . @@ -193,7 +203,7 @@ elif [[ ${packgeName} =~ "tar" ]];then cd ${oriInstallPath} if [ ! -f {originPackageName} ];then echoColor YD "download base installPackage" - wgetFile ${originPackageName} + wgetFile ${originPackageName} ${originversion} ${sourcePath} fi echoColor YD "unzip the base installation package" echoColor BD "tar -xf ${originPackageName}" && tar -xf ${originPackageName} @@ -238,14 +248,19 @@ cd ${installPath} if [[ ${packgeName} =~ "Lite" ]] || ([[ ${packgeName} =~ "x64" ]] && [[ ${packgeName} =~ "client" ]]) || ([[ ${packgeName} =~ "deb" ]] && [[ ${packgeName} =~ "server" ]]) || ([[ ${packgeName} =~ "rpm" ]] && [[ ${packgeName} =~ "server" ]]) ;then echoColor G "===== install taos-tools when package is lite or client =====" cd ${installPath} - wgetFile taosTools-2.1.3-Linux-x64.tar.gz . - tar xf taosTools-2.1.3-Linux-x64.tar.gz + if [ ! -f "taosTools-2.1.3-Linux-x64.tar.gz " ];then + wgetFile taosTools-2.1.3-Linux-x64.tar.gz v2.1.3 web + tar xf taosTools-2.1.3-Linux-x64.tar.gz + fi cd taosTools-2.1.3 && bash install-taostools.sh elif ([[ ${packgeName} =~ "arm64" ]] && [[ ${packgeName} =~ "client" ]]);then echoColor G "===== install taos-tools arm when package is arm64-client =====" cd ${installPath} - wgetFile taosTools-2.1.3-Linux-arm64.tar.gz . - tar xf taosTools-2.1.3-Linux-arm64.tar.gz + if [ ! -f "taosTools-2.1.3-Linux-x64.tar.gz " ];then + wgetFile taosTools-2.1.3-Linux-x64.tar.gz v2.1.3 web + tar xf taosTools-2.1.3-Linux-arm64.tar.gz + fi + cd taosTools-2.1.3 && bash install-taostools.sh fi diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 795b1145c8c32363e1e27b4bb92f2cdc20e803ca..2eb94773e9ed4ad0fe509d35b461dd5f5450639a 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2238,6 +2238,7 @@ int32_t tSerializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) { if (tEncodeI64(&encoder, pReq->dbId) < 0) return -1; if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfTable) < 0) return -1; + if (tEncodeI64(&encoder, pReq->stateTs) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -2254,6 +2255,7 @@ int32_t tDeserializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) { if (tDecodeI64(&decoder, &pReq->dbId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfTable) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->stateTs) < 0) return -1; tEndDecode(&decoder); tDecoderClear(&decoder); @@ -2490,6 +2492,7 @@ int32_t tSerializeSUseDbRspImp(SEncoder *pEncoder, const SUseDbRsp *pRsp) { } if (tEncodeI32(pEncoder, pRsp->errCode) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->stateTs) < 0) return -1; return 0; } @@ -2555,6 +2558,7 @@ int32_t tDeserializeSUseDbRspImp(SDecoder *pDecoder, SUseDbRsp *pRsp) { } if (tDecodeI32(pDecoder, &pRsp->errCode) < 0) return -1; + if (tDecodeI64(pDecoder, &pRsp->stateTs) < 0) return -1; return 0; } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 976f2cfd8baa221e940830d8a8d99e9e840833ce..ec4f42c847b5cc7099ec4a1b2388e0a313b9e3b0 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -91,7 +91,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -102,7 +102,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_USER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -116,7 +116,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_USE_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_USE_DB, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_COMPACT_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_TRIM_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -127,7 +127,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_SPLIT_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -151,7 +151,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 74a92c9fcd1e021ccef13c86556b96a228e02feb..04ac5aba49dcc1622c81201c88524a7b1eed8591 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -321,6 +321,7 @@ typedef struct { int32_t vgVersion; SDbCfg cfg; SRWLatch lock; + int64_t stateTs; } SDbObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 3d9b91dce8a96760be93f08257eea9485f8b4df0..f2a4462bf5d7eaaadc176361e25a8721c1a63784 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1182,6 +1182,7 @@ int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUs memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN); pRsp->uid = pDb->uid; pRsp->vgVersion = pDb->vgVersion; + pRsp->stateTs = pDb->stateTs; pRsp->vgNum = taosArrayGetSize(pRsp->pVgroupInfos); pRsp->hashMethod = pDb->cfg.hashMethod; pRsp->hashPrefix = pDb->cfg.hashPrefix; @@ -1234,6 +1235,8 @@ static int32_t mndProcessUseDbReq(SRpcMsg *pReq) { goto _OVER; } + mDebug("db:%s, process usedb req vgVersion:%d stateTs:%" PRId64 ", rsp vgVersion:%d stateTs:%" PRId64, + usedbReq.db, usedbReq.vgVersion, usedbReq.stateTs, usedbRsp.vgVersion, usedbRsp.stateTs); code = 0; } } @@ -1290,13 +1293,19 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, int32_t numOfTable = mndGetDBTableNum(pDb, pMnode); - if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable) { - mInfo("db:%s, version and numOfTable not changed", pDbVgVersion->dbFName); + if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable /* && + pDbVgVersion->stateTs == pDb->stateTs */) { + mTrace("db:%s, valid dbinfo, vgVersion:%d stateTs:%" PRId64 + " numOfTables:%d, not changed vgVersion:%d stateTs:%" PRId64 " numOfTables:%d", + pDbVgVersion->dbFName, pDbVgVersion->vgVersion, pDbVgVersion->stateTs, pDbVgVersion->numOfTable, + pDb->vgVersion, pDb->stateTs, numOfTable); mndReleaseDb(pMnode, pDb); continue; } else { - mInfo("db:%s, vgroup version changed from %d to %d", pDbVgVersion->dbFName, pDbVgVersion->vgVersion, - pDb->vgVersion); + mInfo("db:%s, valid dbinfo, vgVersion:%d stateTs:%" PRId64 + " numOfTables:%d, changed to vgVersion:%d stateTs:%" PRId64 " numOfTables:%d", + pDbVgVersion->dbFName, pDbVgVersion->vgVersion, pDbVgVersion->stateTs, pDbVgVersion->numOfTable, + pDb->vgVersion, pDb->stateTs, numOfTable); } usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo)); @@ -1310,6 +1319,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, memcpy(usedbRsp.db, pDb->name, TSDB_DB_FNAME_LEN); usedbRsp.uid = pDb->uid; usedbRsp.vgVersion = pDb->vgVersion; + usedbRsp.stateTs = pDb->stateTs; usedbRsp.vgNum = (int32_t)taosArrayGetSize(usedbRsp.pVgroupInfos); usedbRsp.hashMethod = pDb->cfg.hashMethod; usedbRsp.hashPrefix = pDb->cfg.hashPrefix; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 75ae6f467b8187ad4ce69426b9ad8f787985a7ba..521f924fad89eb3a169044e6dce27b41a5487770 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "mndDnode.h" +#include "mndDb.h" #include "mndMnode.h" #include "mndPrivilege.h" #include "mndQnode.h" @@ -356,7 +357,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { pDnode->lastAccessTime = curMs; const STraceId *trace = &pReq->info.traceId; mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id, - pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq); + pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq); for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) { SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v); @@ -376,9 +377,9 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { if (pVgroup->vnodeGid[vg].dnodeId == statusReq.dnodeId) { if (pVgroup->vnodeGid[vg].syncState != pVload->syncState || pVgroup->vnodeGid[vg].syncRestore != pVload->syncRestore) { - mTrace("vgId:%d, role changed, old state:%s restored:%d new state:%s restored:%d", pVgroup->vgId, - syncStr(pVgroup->vnodeGid[vg].syncState), pVgroup->vnodeGid[vg].syncRestore, - syncStr(pVload->syncState), pVload->syncRestore); + mInfo("vgId:%d, state changed by status msg, old state:%s restored:%d new state:%s restored:%d", + pVgroup->vgId, syncStr(pVgroup->vnodeGid[vg].syncState), pVgroup->vnodeGid[vg].syncRestore, + syncStr(pVload->syncState), pVload->syncRestore); pVgroup->vnodeGid[vg].syncState = pVload->syncState; pVgroup->vnodeGid[vg].syncRestore = pVload->syncRestore; roleChanged = true; @@ -387,7 +388,12 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { } } if (roleChanged) { - // notify scheduler role has changed + SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName); + if (pDb != NULL && pDb->stateTs != curMs) { + mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name, pDb->stateTs, curMs); + pDb->stateTs = curMs; + } + mndReleaseDb(pMnode, pDb); } } diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 1b2d85bd29df73dd99814f91fc6b4d7a53f3f70f..dcfc046a1eb8fde76e04fd23ba523ad0ea8994b3 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -139,6 +139,63 @@ static void mndIncreaseUpTime(SMnode *pMnode) { } } +static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs) { + SSdb *pSdb = pMnode->pSdb; + + void *pIter = NULL; + while (1) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + bool roleChanged = false; + for (int32_t vg = 0; vg < pVgroup->replica; ++vg) { + if (pVgroup->vnodeGid[vg].dnodeId == dnodeId) { + if (pVgroup->vnodeGid[vg].syncState != TAOS_SYNC_STATE_ERROR) { + mInfo("vgId:%d, state changed by offline check, old state:%s restored:%d new state:error restored:0", + pVgroup->vgId, syncStr(pVgroup->vnodeGid[vg].syncState), pVgroup->vnodeGid[vg].syncRestore); + pVgroup->vnodeGid[vg].syncState = TAOS_SYNC_STATE_ERROR; + pVgroup->vnodeGid[vg].syncRestore = 0; + roleChanged = true; + } + break; + } + } + + if (roleChanged) { + SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName); + if (pDb != NULL && pDb->stateTs != curMs) { + mInfo("db:%s, stateTs changed by offline check, old newTs:%" PRId64 " newTs:%" PRId64, pDb->name, pDb->stateTs, + curMs); + pDb->stateTs = curMs; + } + mndReleaseDb(pMnode, pDb); + } + + sdbRelease(pSdb, pVgroup); + } +} + +static void mndCheckDnodeOffline(SMnode *pMnode) { + SSdb *pSdb = pMnode->pSdb; + int64_t curMs = taosGetTimestampMs(); + + void *pIter = NULL; + while (1) { + SDnodeObj *pDnode = NULL; + pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode); + if (pIter == NULL) break; + + bool online = mndIsDnodeOnline(pDnode, curMs); + if (!online) { + mInfo("dnode:%d, in offline state", pDnode->id); + mndSetVgroupOffline(pMnode, pDnode->id, curMs); + } + + sdbRelease(pSdb, pDnode); + } +} + static void *mndThreadFp(void *param) { SMnode *pMnode = param; int64_t lastTime = 0; @@ -174,6 +231,10 @@ static void *mndThreadFp(void *param) { if (sec % tsUptimeInterval == 0) { mndIncreaseUpTime(pMnode); } + + if (sec % (tsStatusInterval * 5) == 0) { + mndCheckDnodeOffline(pMnode); + } } return NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 631cbd415ffea512f1447a69f4e362d0b023c7b9..0fa7cd2296ce0300a3b5d43ebb722fa657019aab 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -366,9 +366,9 @@ static void clearBlockScanInfo(STableBlockScanInfo* p) { tMapDataClear(&p->mapData); } -static void destroyAllBlockScanInfo(SHashObj* pTableMap) { +static void destroyAllBlockScanInfo(SHashObj* pTableMap, bool clearEntry) { void* p = NULL; - while ((p = taosHashIterate(pTableMap, p)) != NULL) { + while (clearEntry && ((p = taosHashIterate(pTableMap, p)) != NULL)) { clearBlockScanInfo(*(STableBlockScanInfo**)p); } @@ -3768,7 +3768,7 @@ void tsdbReaderClose(STsdbReader* pReader) { cleanupDataBlockIterator(&pReader->status.blockIter); size_t numOfTables = taosHashGetSize(pReader->status.pTableMap); - destroyAllBlockScanInfo(pReader->status.pTableMap); + destroyAllBlockScanInfo(pReader->status.pTableMap, (pReader->innerReader[0] == NULL) ? true : false); blockDataDestroy(pReader->pResBlock); clearBlockScanInfoBuf(&pReader->blockInfoBuf); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index efb9a49df28c7ec0b3a4433b017e4d2da2b09ac1..370267d2191185c30c0491c0489fa94f27b1684e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3383,6 +3383,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); + if (NULL == pOperator) { + pTaskInfo->code = terrno; + return NULL; + } + STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { @@ -3403,6 +3408,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo); + if (NULL == pOperator) { + pTaskInfo->code = terrno; + return NULL; + } STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 0af8f5075e6dccb7f2889f807edc775485a2d228..249ba1815de6067274d2cdd6a5d8782cfcd47d66 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -66,16 +66,36 @@ static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SLogicNode* pNod return pSubplan; } -static SLogicSubplan* splCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode, ESubplanType subplanType) { +static bool splHasScan(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { + return true; + } + + SNode* pChild = NULL; + FOREACH(pChild, pNode->pChildren) { + if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) { + return true; + } + return splHasScan((SLogicNode*)pChild); + } + + return false; +} + +static void splSetSubplanType(SLogicSubplan* pSubplan) { + pSubplan->subplanType = splHasScan(pSubplan->pNode) ? SUBPLAN_TYPE_SCAN : SUBPLAN_TYPE_MERGE; +} + +static SLogicSubplan* splCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode) { SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN); if (NULL == pSubplan) { return NULL; } pSubplan->id.queryId = pCxt->queryId; pSubplan->id.groupId = pCxt->groupId; - pSubplan->subplanType = subplanType; pSubplan->pNode = pNode; pNode->pParent = NULL; + splSetSubplanType(pSubplan); return pSubplan; } @@ -1204,7 +1224,7 @@ static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubpl SNode* pChild = NULL; FOREACH(pChild, pSplitNode->pChildren) { - SLogicSubplan* pNewSubplan = splCreateSubplan(pCxt, (SLogicNode*)pChild, pUnionSubplan->subplanType); + SLogicSubplan* pNewSubplan = splCreateSubplan(pCxt, (SLogicNode*)pChild); code = nodesListMakeStrictAppend(&pUnionSubplan->pChildren, (SNode*)pNewSubplan); if (TSDB_CODE_SUCCESS == code) { REPLACE_NODE(NULL); @@ -1390,10 +1410,9 @@ static int32_t insertSelectSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { SLogicSubplan* pNewSubplan = NULL; SNodeList* pSubplanChildren = info.pSubplan->pChildren; - ESubplanType subplanType = info.pSubplan->subplanType; int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pQueryRoot, SUBPLAN_TYPE_MODIFY); if (TSDB_CODE_SUCCESS == code) { - pNewSubplan = splCreateSubplan(pCxt, info.pQueryRoot, subplanType); + pNewSubplan = splCreateSubplan(pCxt, info.pQueryRoot); if (NULL == pNewSubplan) { code = TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 44df1ddda280b985281e991c9124d33183f8eabb..df9a818fee5bc79fe324b35c92ec306acfe110fa 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -912,8 +912,8 @@ int32_t filterDetachCnfGroups(SArray *group, SArray *left, SArray *right) { if (taosArrayGetSize(left) <= 0) { if (taosArrayGetSize(right) <= 0) { - fltError("both groups are empty"); - FLT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + fltDebug("both groups are empty"); + return TSDB_CODE_SUCCESS; } SFilterGroup *gp = NULL; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 98677c64d3a31af89cd5f8d2c2817d9f0625c5d0..c3abfa971ec70a173b1f8d38634e29163bc276b5 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -474,7 +474,7 @@ void* destroyConnPool(void* pool) { } static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { - char key[32] = {0}; + char key[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(key, ip, port); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); @@ -525,7 +525,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { conn->status = ConnInPool; if (conn->list == NULL) { - char key[32] = {0}; + char key[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port); tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap); conn->list = taosHashGet((SHashObj*)pool, key, strlen(key));