提交 cebbc071 编写于 作者: S Shengliang Guan

TD-10431 mnode cluster

上级 cf923d18
......@@ -658,7 +658,7 @@ typedef struct {
typedef struct SStatusMsg {
int32_t sver;
int32_t dnodeId;
int64_t clusterId;
int32_t clusterId;
uint32_t rebootTime; // time stamp for last reboot
int16_t numOfCores;
int16_t numOfSupportMnodes;
......@@ -671,9 +671,9 @@ typedef struct SStatusMsg {
typedef struct {
int32_t dnodeId;
int32_t clusterId;
int8_t dropped;
char reserved[3];
int64_t clusterId;
char reserved[7];
} SDnodeCfg;
typedef struct {
......
......@@ -45,7 +45,7 @@ typedef struct SMnodeLoad {
typedef struct {
int32_t dnodeId;
int64_t clusterId;
int32_t clusterId;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
......
......@@ -52,7 +52,7 @@ bool taosGetSysMemory(float *memoryUsedMB);
void taosPrintOsInfo();
int taosSystem(const char *cmd);
void taosKillSystem();
bool taosGetSystemUid(char *uid, int32_t uidlen);
int32_t taosGetSystemUid(char *uid, int32_t uidlen);
char * taosGetCmdlineByPID(int pid);
void taosSetCoreDump(bool enable);
......
......@@ -27,7 +27,7 @@ void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndGetDnodeId(SDnode *pDnode);
int64_t dndGetClusterId(SDnode *pDnode);
int32_t dndGetClusterId(SDnode *pDnode);
void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet);
void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg);
......
......@@ -57,8 +57,8 @@ typedef struct {
typedef struct {
int32_t dnodeId;
int32_t dropped;
int32_t clusterId;
uint32_t rebootTime;
int64_t clusterId;
SEpSet mnodeEpSet;
char *file;
SHashObj *dnodeHash;
......
......@@ -26,10 +26,10 @@ int32_t dndGetDnodeId(SDnode *pDnode) {
return dnodeId;
}
int64_t dndGetClusterId(SDnode *pDnode) {
int32_t dndGetClusterId(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosRLockLatch(&pMgmt->latch);
int64_t clusterId = pMgmt->clusterId;
int32_t clusterId = pMgmt->clusterId;
taosRUnLockLatch(&pMgmt->latch);
return clusterId;
}
......@@ -201,7 +201,7 @@ static int32_t dndReadDnodes(SDnode *pDnode) {
dError("failed to read %s since clusterId not found", pMgmt->file);
goto PRASE_DNODE_OVER;
}
pMgmt->clusterId = atoll(clusterId->valuestring);
pMgmt->clusterId = atol(clusterId->valuestring);
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_String) {
......@@ -308,7 +308,7 @@ static int32_t dndWriteDnodes(SDnode *pDnode) {
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", pMgmt->dnodeId);
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pMgmt->clusterId);
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%d\",\n", pMgmt->clusterId);
len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", pMgmt->dropped);
len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n");
for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) {
......@@ -350,7 +350,7 @@ static void dndSendStatusMsg(SDnode *pDnode) {
taosRLockLatch(&pMgmt->latch);
pStatus->sver = htonl(pDnode->opt.sver);
pStatus->dnodeId = htonl(pMgmt->dnodeId);
pStatus->clusterId = htobe64(pMgmt->clusterId);
pStatus->clusterId = htonl(pMgmt->clusterId);
pStatus->rebootTime = htonl(pMgmt->rebootTime);
pStatus->numOfCores = htons(pDnode->opt.numOfCores);
pStatus->numOfSupportMnodes = htons(pDnode->opt.numOfCores);
......@@ -379,7 +379,7 @@ static void dndSendStatusMsg(SDnode *pDnode) {
static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pMgmt->dnodeId == 0 || pMgmt->dropped != pCfg->dropped) {
dInfo("set dnodeId:%d clusterId:%" PRId64 " dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped);
dInfo("set dnodeId:%d clusterId:%d dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped);
taosWLockLatch(&pMgmt->latch);
pMgmt->dnodeId = pCfg->dnodeId;
......@@ -420,7 +420,7 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SStatusRsp *pRsp = pMsg->pCont;
SDnodeCfg *pCfg = &pRsp->dnodeCfg;
pCfg->dnodeId = htonl(pCfg->dnodeId);
pCfg->clusterId = htobe64(pCfg->clusterId);
pCfg->clusterId = htonl(pCfg->clusterId);
dndUpdateDnodeCfg(pDnode, pCfg);
if (pCfg->dropped) return;
......
......@@ -58,6 +58,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_DNODE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_DNODE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_MNODE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_MNODE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_DB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_DB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_USE_DB] = dndProcessMnodeWriteMsg;
......@@ -115,12 +117,12 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP] = dndProcessMnodeWriteMsg;
// message from dnode to mnode
pMgmt->msgFp[TSDB_MSG_TYPE_AUTH] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_AUTH_RSP] = dndProcessDnodeRsp;
pMgmt->msgFp[TSDB_MSG_TYPE_GRANT] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_GRANT_RSP] = dndProcessDnodeRsp;
pMgmt->msgFp[TSDB_MSG_TYPE_STATUS] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_STATUS_RSP] = dndProcessDnodeRsp;
pMgmt->msgFp[TSDB_MSG_TYPE_AUTH] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_AUTH_RSP] = dndProcessDnodeRsp;
}
static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
......
......@@ -115,7 +115,7 @@ typedef struct STrans {
} STrans;
typedef struct SClusterObj {
int64_t id;
int32_t id;
char uid[TSDB_CLUSTER_ID_LEN];
int64_t createdTime;
int64_t updateTime;
......
......@@ -36,7 +36,7 @@ typedef struct {
typedef struct SMnode {
int32_t dnodeId;
int64_t clusterId;
int32_t clusterId;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
......@@ -58,10 +58,6 @@ typedef struct SMnode {
char *charset;
} SMnode;
tmr_h mndGetTimer(SMnode *pMnode);
int32_t mndGetDnodeId(SMnode *pMnode);
int64_t mndGetClusterId(SMnode *pMnode);
void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg);
void mndSendMsgToMnode(SMnode *pMnode, SRpcMsg *pMsg);
void mndSendRedirectMsg(SMnode *pMnode, SRpcMsg *pMsg);
......
......@@ -14,8 +14,96 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "mndInt.h"
#include "mndCluster.h"
#include "mndTrans.h"
int32_t mndInitCluster(SMnode *pMnode) { return 0; }
void mndCleanupCluster(SMnode *pMnode) {}
\ No newline at end of file
#define SDB_CLUSTER_VER 1
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, SDB_CLUSTER_VER, sizeof(SClusterObj));
if (pRaw == NULL) return NULL;
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pCluster->id);
SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime)
SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime)
SDB_SET_BINARY(pRaw, dataPos, pCluster->uid, TSDB_CLUSTER_ID_LEN)
return pRaw;
}
static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sver != SDB_CLUSTER_VER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to decode cluster since %s", terrstr());
return NULL;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SClusterObj));
SClusterObj *pCluster = sdbGetRowObj(pRow);
if (pCluster == NULL) return NULL;
int32_t dataPos = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &pCluster->id)
SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->updateTime)
SDB_GET_BINARY(pRaw, pRow, dataPos, pCluster->uid, TSDB_CLUSTER_ID_LEN)
return pRow;
}
static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster) {
mTrace("cluster:%d, perform insert action", pCluster->id);
return 0;
}
static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) {
mTrace("cluster:%d, perform delete action", pCluster->id);
return 0;
}
static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pSrcCluster, SClusterObj *pDstCluster) {
mTrace("cluster:%d, perform update action", pSrcCluster->id);
return 0;
}
static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
SClusterObj clusterObj = {0};
clusterObj.createdTime = taosGetTimestampMs();
clusterObj.updateTime = clusterObj.createdTime;
int32_t code = taosGetSystemUid(clusterObj.uid, TSDB_CLUSTER_ID_LEN);
if (code != 0) {
strcpy(clusterObj.uid, "tdengine2.0");
mError("failed to get uid from system, set to default val %s", clusterObj.uid);
} else {
mDebug("cluster:%d, uid is %s", clusterObj.id, clusterObj.uid);
}
clusterObj.id = MurmurHash3_32(clusterObj.uid, TSDB_CLUSTER_ID_LEN);
pMnode->clusterId = clusterObj.id;
SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj);
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mTrace("cluster:%d, will be created while deploy sdb", clusterObj.id);
return sdbWrite(pMnode->pSdb, pRaw);
}
int32_t mndInitCluster(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_CLUSTER,
.keyType = SDB_KEY_INT32,
.deployFp = (SdbDeployFp)mndCreateDefaultCluster,
.encodeFp = (SdbEncodeFp)mndClusterActionEncode,
.decodeFp = (SdbDecodeFp)mndClusterActionDecode,
.insertFp = (SdbInsertFp)mndClusterActionInsert,
.updateFp = (SdbUpdateFp)mndClusterActionUpdate,
.deleteFp = (SdbDeleteFp)mndClusterActionDelete};
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupCluster(SMnode *pMnode) {}
......@@ -214,7 +214,7 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) {
static void mndParseStatusMsg(SStatusMsg *pStatus) {
pStatus->sver = htonl(pStatus->sver);
pStatus->dnodeId = htonl(pStatus->dnodeId);
pStatus->clusterId = htobe64(pStatus->clusterId);
pStatus->clusterId = htonl(pStatus->clusterId);
pStatus->rebootTime = htonl(pStatus->rebootTime);
pStatus->numOfCores = htons(pStatus->numOfCores);
pStatus->numOfSupportMnodes = htons(pStatus->numOfSupportMnodes);
......@@ -259,15 +259,14 @@ static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
return TSDB_CODE_MND_INVALID_MSG_VERSION;
}
int64_t clusterId = mndGetClusterId(pMnode);
if (pStatus->dnodeId == 0) {
mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, clusterId);
mDebug("dnode:%d %s, first access, set clusterId %d", pDnode->id, pDnode->ep, pMnode->clusterId);
} else {
if (pStatus->clusterId != clusterId) {
if (pStatus->clusterId != pMnode->clusterId) {
if (pDnode != NULL && pDnode->status != DND_STATUS_READY) {
pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
}
mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId, clusterId);
mError("dnode:%d, clusterId %d not match exist %d", pDnode->id, pStatus->clusterId, pMnode->clusterId);
mndReleaseDnode(pMnode, pDnode);
return TSDB_CODE_MND_INVALID_CLUSTER_ID;
} else {
......@@ -306,7 +305,7 @@ static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
pRsp->dnodeCfg.dnodeId = htonl(pDnode->id);
pRsp->dnodeCfg.dropped = 0;
pRsp->dnodeCfg.clusterId = htobe64(clusterId);
pRsp->dnodeCfg.clusterId = htonl(pMnode->clusterId);
mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps);
pMsg->contLen = contLen;
......
......@@ -203,9 +203,9 @@ static void mndSendTelemetryReport() {
return;
}
int64_t clusterId = mndGetClusterId(NULL);
int32_t clusterId = 0;
char clusterIdStr[20] = {0};
snprintf(clusterIdStr, sizeof(clusterIdStr), "%" PRId64, clusterId);
snprintf(clusterIdStr, sizeof(clusterIdStr), "%d", clusterId);
SBufferWriter bw = tbufInitWriter(NULL, false);
mndBeginObject(&bw);
......
......@@ -32,30 +32,6 @@
#include "mndUser.h"
#include "mndVgroup.h"
int32_t mndGetDnodeId(SMnode *pMnode) {
if (pMnode != NULL) {
return pMnode->dnodeId;
}
return -1;
}
int64_t mndGetClusterId(SMnode *pMnode) {
if (pMnode != NULL) {
return pMnode->clusterId;
}
return -1;
}
tmr_h mndGetTimer(SMnode *pMnode) {
if (pMnode != NULL) {
return pMnode->timer;
}
return NULL;
}
void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
if (pMnode != NULL && pMnode->sendMsgToDnodeFp != NULL) {
(*pMnode->sendMsgToDnodeFp)(pMnode->pDnode, pEpSet, pMsg);
......
......@@ -252,14 +252,14 @@ LONG WINAPI FlCrashDump(PEXCEPTION_POINTERS ep) {
void taosSetCoreDump() { SetUnhandledExceptionFilter(&FlCrashDump); }
bool taosGetSystemUid(char *uid) {
int32_t taosGetSystemUid(char *uid, int32_t uidlen) {
GUID guid;
CoCreateGuid(&guid);
sprintf(uid, "%08X-%04X-%04X-%02X%02X-%02X%02X%02X%02X%02X%02X", guid.Data1, guid.Data2, guid.Data3, guid.Data4[0],
guid.Data4[1], guid.Data4[2], guid.Data4[3], guid.Data4[4], guid.Data4[5], guid.Data4[6], guid.Data4[7]);
return true;
return 0;
}
char *taosGetCmdlineByPID(int pid) { return ""; }
......@@ -452,12 +452,12 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
}
}
bool taosGetSystemUid(char *uid) {
int32_t taosGetSystemUid(char *uid, int32_t uidlen) {
uuid_t uuid = {0};
uuid_generate(uuid);
// it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null
uuid_unparse_lower(uuid, uid);
return true;
return 0;
}
char *taosGetCmdlineByPID(int pid) {
......@@ -1070,13 +1070,13 @@ void taosSetCoreDump(bool enable) {
#endif
}
bool taosGetSystemUid(char *uid, int32_t uidlen) {
int32_t taosGetSystemUid(char *uid, int32_t uidlen) {
int fd;
int len = 0;
fd = open("/proc/sys/kernel/random/uuid", 0);
if (fd < 0) {
return false;
return -1;
} else {
len = read(fd, uid, uidlen);
close(fd);
......@@ -1084,9 +1084,10 @@ bool taosGetSystemUid(char *uid, int32_t uidlen) {
if (len >= 36) {
uid[36] = 0;
return true;
return 0;
}
return false;
return -1;
}
char *taosGetCmdlineByPID(int pid) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册