提交 718ccf5d 编写于 作者: D dapan1121

Merge remote-tracking branch 'origin/fix/TD-20264' into enh/TD-20288

......@@ -838,6 +838,7 @@ typedef struct {
int64_t dbId;
int32_t vgVersion;
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
int64_t stateTs; // ms
} SUseDbReq;
int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
......@@ -853,6 +854,7 @@ typedef struct {
int8_t hashMethod;
SArray* pVgroupInfos; // Array of SVgroupInfo
int32_t errCode;
int64_t stateTs; // ms
} SUseDbRsp;
int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, const SUseDbRsp* pRsp);
......
......@@ -124,6 +124,7 @@ typedef struct SDbVgVersion {
int64_t dbId;
int32_t vgVersion;
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
int64_t stateTs;
} SDbVgVersion;
typedef struct STbSVersion {
......
......@@ -2238,6 +2238,7 @@ int32_t tSerializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) {
if (tEncodeI64(&encoder, pReq->dbId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfTable) < 0) return -1;
if (tEncodeI64(&encoder, pReq->stateTs) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -2254,6 +2255,7 @@ int32_t tDeserializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) {
if (tDecodeI64(&decoder, &pReq->dbId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfTable) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->stateTs) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
......@@ -2490,6 +2492,7 @@ int32_t tSerializeSUseDbRspImp(SEncoder *pEncoder, const SUseDbRsp *pRsp) {
}
if (tEncodeI32(pEncoder, pRsp->errCode) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->stateTs) < 0) return -1;
return 0;
}
......@@ -2555,6 +2558,7 @@ int32_t tDeserializeSUseDbRspImp(SDecoder *pDecoder, SUseDbRsp *pRsp) {
}
if (tDecodeI32(pDecoder, &pRsp->errCode) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->stateTs) < 0) return -1;
return 0;
}
......
......@@ -91,7 +91,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......@@ -102,7 +102,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_USER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......@@ -116,7 +116,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_USE_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_USE_DB, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_COMPACT_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_TRIM_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......@@ -127,7 +127,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_SPLIT_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......@@ -151,7 +151,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
......
......@@ -321,6 +321,7 @@ typedef struct {
int32_t vgVersion;
SDbCfg cfg;
SRWLatch lock;
int64_t stateTs;
} SDbObj;
typedef struct {
......
......@@ -1182,6 +1182,7 @@ int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUs
memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN);
pRsp->uid = pDb->uid;
pRsp->vgVersion = pDb->vgVersion;
pRsp->stateTs = pDb->stateTs;
pRsp->vgNum = taosArrayGetSize(pRsp->pVgroupInfos);
pRsp->hashMethod = pDb->cfg.hashMethod;
pRsp->hashPrefix = pDb->cfg.hashPrefix;
......@@ -1234,6 +1235,8 @@ static int32_t mndProcessUseDbReq(SRpcMsg *pReq) {
goto _OVER;
}
mDebug("db:%s, process usedb req vgVersion:%d stateTs:%" PRId64 ", rsp vgVersion:%d stateTs:%" PRId64,
usedbReq.db, usedbReq.vgVersion, usedbReq.stateTs, usedbRsp.vgVersion, usedbRsp.stateTs);
code = 0;
}
}
......@@ -1290,13 +1293,19 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
int32_t numOfTable = mndGetDBTableNum(pDb, pMnode);
if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable) {
mInfo("db:%s, version and numOfTable not changed", pDbVgVersion->dbFName);
if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable /* &&
pDbVgVersion->stateTs == pDb->stateTs */) {
mTrace("db:%s, valid dbinfo, vgVersion:%d stateTs:%" PRId64
" numOfTables:%d, not changed vgVersion:%d stateTs:%" PRId64 " numOfTables:%d",
pDbVgVersion->dbFName, pDbVgVersion->vgVersion, pDbVgVersion->stateTs, pDbVgVersion->numOfTable,
pDb->vgVersion, pDb->stateTs, numOfTable);
mndReleaseDb(pMnode, pDb);
continue;
} else {
mInfo("db:%s, vgroup version changed from %d to %d", pDbVgVersion->dbFName, pDbVgVersion->vgVersion,
pDb->vgVersion);
mInfo("db:%s, valid dbinfo, vgVersion:%d stateTs:%" PRId64
" numOfTables:%d, changed to vgVersion:%d stateTs:%" PRId64 " numOfTables:%d",
pDbVgVersion->dbFName, pDbVgVersion->vgVersion, pDbVgVersion->stateTs, pDbVgVersion->numOfTable,
pDb->vgVersion, pDb->stateTs, numOfTable);
}
usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
......@@ -1310,6 +1319,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
memcpy(usedbRsp.db, pDb->name, TSDB_DB_FNAME_LEN);
usedbRsp.uid = pDb->uid;
usedbRsp.vgVersion = pDb->vgVersion;
usedbRsp.stateTs = pDb->stateTs;
usedbRsp.vgNum = (int32_t)taosArrayGetSize(usedbRsp.pVgroupInfos);
usedbRsp.hashMethod = pDb->cfg.hashMethod;
usedbRsp.hashPrefix = pDb->cfg.hashPrefix;
......
......@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "mndDnode.h"
#include "mndDb.h"
#include "mndMnode.h"
#include "mndPrivilege.h"
#include "mndQnode.h"
......@@ -376,8 +377,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
if (pVgroup->vnodeGid[vg].dnodeId == statusReq.dnodeId) {
if (pVgroup->vnodeGid[vg].syncState != pVload->syncState ||
pVgroup->vnodeGid[vg].syncRestore != pVload->syncRestore) {
mTrace("vgId:%d, role changed, old state:%s restored:%d new state:%s restored:%d", pVgroup->vgId,
syncStr(pVgroup->vnodeGid[vg].syncState), pVgroup->vnodeGid[vg].syncRestore,
mInfo("vgId:%d, state changed by status msg, old state:%s restored:%d new state:%s restored:%d",
pVgroup->vgId, syncStr(pVgroup->vnodeGid[vg].syncState), pVgroup->vnodeGid[vg].syncRestore,
syncStr(pVload->syncState), pVload->syncRestore);
pVgroup->vnodeGid[vg].syncState = pVload->syncState;
pVgroup->vnodeGid[vg].syncRestore = pVload->syncRestore;
......@@ -387,7 +388,12 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
}
}
if (roleChanged) {
// notify scheduler role has changed
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
if (pDb != NULL && pDb->stateTs != curMs) {
mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name, pDb->stateTs, curMs);
pDb->stateTs = curMs;
}
mndReleaseDb(pMnode, pDb);
}
}
......
......@@ -139,6 +139,63 @@ static void mndIncreaseUpTime(SMnode *pMnode) {
}
}
static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SVgObj *pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
bool roleChanged = false;
for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
if (pVgroup->vnodeGid[vg].dnodeId == dnodeId) {
if (pVgroup->vnodeGid[vg].syncState != TAOS_SYNC_STATE_ERROR) {
mInfo("vgId:%d, state changed by offline check, old state:%s restored:%d new state:error restored:0",
pVgroup->vgId, syncStr(pVgroup->vnodeGid[vg].syncState), pVgroup->vnodeGid[vg].syncRestore);
pVgroup->vnodeGid[vg].syncState = TAOS_SYNC_STATE_ERROR;
pVgroup->vnodeGid[vg].syncRestore = 0;
roleChanged = true;
}
break;
}
}
if (roleChanged) {
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
if (pDb != NULL && pDb->stateTs != curMs) {
mInfo("db:%s, stateTs changed by offline check, old newTs:%" PRId64 " newTs:%" PRId64, pDb->name, pDb->stateTs,
curMs);
pDb->stateTs = curMs;
}
mndReleaseDb(pMnode, pDb);
}
sdbRelease(pSdb, pVgroup);
}
}
static void mndCheckDnodeOffline(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
int64_t curMs = taosGetTimestampMs();
void *pIter = NULL;
while (1) {
SDnodeObj *pDnode = NULL;
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
if (pIter == NULL) break;
bool online = mndIsDnodeOnline(pDnode, curMs);
if (!online) {
mInfo("dnode:%d, in offline state", pDnode->id);
mndSetVgroupOffline(pMnode, pDnode->id, curMs);
}
sdbRelease(pSdb, pDnode);
}
}
static void *mndThreadFp(void *param) {
SMnode *pMnode = param;
int64_t lastTime = 0;
......@@ -174,6 +231,10 @@ static void *mndThreadFp(void *param) {
if (sec % tsUptimeInterval == 0) {
mndIncreaseUpTime(pMnode);
}
if (sec % (tsStatusInterval * 5) == 0) {
mndCheckDnodeOffline(pMnode);
}
}
return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册