提交 bb3a0b21 编写于 作者: S Shengliang Guan

change clusterid from int32 to int64

上级 791c6c84
...@@ -320,7 +320,7 @@ typedef struct SEpSet { ...@@ -320,7 +320,7 @@ typedef struct SEpSet {
typedef struct { typedef struct {
int32_t acctId; int32_t acctId;
uint32_t clusterId; int64_t clusterId;
int32_t connId; int32_t connId;
int8_t superUser; int8_t superUser;
int8_t reserved[5]; int8_t reserved[5];
...@@ -644,7 +644,7 @@ typedef struct { ...@@ -644,7 +644,7 @@ typedef struct {
typedef struct { typedef struct {
int32_t sver; int32_t sver;
int32_t dnodeId; int32_t dnodeId;
int32_t clusterId; int64_t clusterId;
int64_t rebootTime; int64_t rebootTime;
int64_t updateTime; int64_t updateTime;
int16_t numOfCores; int16_t numOfCores;
...@@ -660,7 +660,7 @@ typedef struct { ...@@ -660,7 +660,7 @@ typedef struct {
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int32_t clusterId; int64_t clusterId;
int8_t dropped; int8_t dropped;
char reserved[7]; char reserved[7];
} SDnodeCfg; } SDnodeCfg;
......
...@@ -59,7 +59,7 @@ typedef struct SAppInstInfo { ...@@ -59,7 +59,7 @@ typedef struct SAppInstInfo {
SCorEpSet mgmtEp; SCorEpSet mgmtEp;
SInstanceActivity summary; SInstanceActivity summary;
SList *pConnList; // STscObj linked list SList *pConnList; // STscObj linked list
uint32_t clusterId; int64_t clusterId;
void *pTransporter; void *pTransporter;
} SAppInstInfo; } SAppInstInfo;
......
...@@ -170,8 +170,8 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { ...@@ -170,8 +170,8 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
if (pDcl->msgType == TDMT_MND_CREATE_TABLE) { if (pDcl->msgType == TDMT_MND_CREATE_TABLE) {
struct SCatalog* pCatalog = NULL; struct SCatalog* pCatalog = NULL;
char buf[12] = {0}; char buf[18] = {0};
sprintf(buf, "%d", pRequest->pTscObj->pAppInfo->clusterId); sprintf(buf, "%" PRId64, pRequest->pTscObj->pAppInfo->clusterId);
int32_t code = catalogGetHandle(buf, &pCatalog); int32_t code = catalogGetHandle(buf, &pCatalog);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
......
...@@ -59,7 +59,7 @@ int processConnectRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { ...@@ -59,7 +59,7 @@ int processConnectRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
SConnectRsp *pConnect = (SConnectRsp *)pMsg; SConnectRsp *pConnect = (SConnectRsp *)pMsg;
pConnect->acctId = htonl(pConnect->acctId); pConnect->acctId = htonl(pConnect->acctId);
pConnect->connId = htonl(pConnect->connId); pConnect->connId = htonl(pConnect->connId);
pConnect->clusterId = htonl(pConnect->clusterId); pConnect->clusterId = htobe64(pConnect->clusterId);
assert(pConnect->epSet.numOfEps > 0); assert(pConnect->epSet.numOfEps > 0);
for(int32_t i = 0; i < pConnect->epSet.numOfEps; ++i) { for(int32_t i = 0; i < pConnect->epSet.numOfEps; ++i) {
...@@ -82,7 +82,8 @@ int processConnectRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { ...@@ -82,7 +82,8 @@ int processConnectRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
pRequest->body.resInfo.pRspMsg = pMsg; pRequest->body.resInfo.pRspMsg = pMsg;
tscDebug("0x%" PRIx64 " clusterId:%d, totalConn:%"PRId64, pRequest->requestId, pConnect->clusterId, pTscObj->pAppInfo->numOfConns); tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,
pTscObj->pAppInfo->numOfConns);
return 0; return 0;
} }
......
...@@ -307,7 +307,7 @@ static int32_t dndWriteDnodes(SDnode *pDnode) { ...@@ -307,7 +307,7 @@ static int32_t dndWriteDnodes(SDnode *pDnode) {
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pMgmt->dnodeId); len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pMgmt->dnodeId);
len += snprintf(content + len, maxLen - len, " \"clusterId\": %d,\n", pMgmt->clusterId); len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pMgmt->clusterId);
len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped); len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped);
len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n"); len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n");
for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) { for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) {
...@@ -348,7 +348,7 @@ void dndSendStatusMsg(SDnode *pDnode) { ...@@ -348,7 +348,7 @@ void dndSendStatusMsg(SDnode *pDnode) {
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
pStatus->sver = htonl(pDnode->opt.sver); pStatus->sver = htonl(pDnode->opt.sver);
pStatus->dnodeId = htonl(pMgmt->dnodeId); pStatus->dnodeId = htonl(pMgmt->dnodeId);
pStatus->clusterId = htonl(pMgmt->clusterId); pStatus->clusterId = htobe64(pMgmt->clusterId);
pStatus->rebootTime = htobe64(pMgmt->rebootTime); pStatus->rebootTime = htobe64(pMgmt->rebootTime);
pStatus->updateTime = htobe64(pMgmt->updateTime); pStatus->updateTime = htobe64(pMgmt->updateTime);
pStatus->numOfCores = htons(pDnode->opt.numOfCores); pStatus->numOfCores = htons(pDnode->opt.numOfCores);
...@@ -378,7 +378,7 @@ void dndSendStatusMsg(SDnode *pDnode) { ...@@ -378,7 +378,7 @@ void dndSendStatusMsg(SDnode *pDnode) {
static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) { static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pMgmt->dnodeId == 0 || pMgmt->dropped != pCfg->dropped) { if (pMgmt->dnodeId == 0 || pMgmt->dropped != pCfg->dropped) {
dInfo("set dnodeId:%d clusterId:%d dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped); dInfo("set dnodeId:%d clusterId:% " PRId64 " dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped);
taosWLockLatch(&pMgmt->latch); taosWLockLatch(&pMgmt->latch);
pMgmt->dnodeId = pCfg->dnodeId; pMgmt->dnodeId = pCfg->dnodeId;
...@@ -424,7 +424,7 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -424,7 +424,7 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SStatusRsp *pRsp = pMsg->pCont; SStatusRsp *pRsp = pMsg->pCont;
SDnodeCfg *pCfg = &pRsp->dnodeCfg; SDnodeCfg *pCfg = &pRsp->dnodeCfg;
pCfg->dnodeId = htonl(pCfg->dnodeId); pCfg->dnodeId = htonl(pCfg->dnodeId);
pCfg->clusterId = htonl(pCfg->clusterId); pCfg->clusterId = htobe64(pCfg->clusterId);
dndUpdateDnodeCfg(pDnode, pCfg); dndUpdateDnodeCfg(pDnode, pCfg);
if (pCfg->dropped) { if (pCfg->dropped) {
......
...@@ -111,7 +111,7 @@ typedef struct { ...@@ -111,7 +111,7 @@ typedef struct {
} STrans; } STrans;
typedef struct { typedef struct {
int32_t id; int64_t id;
char name[TSDB_CLUSTER_ID_LEN]; char name[TSDB_CLUSTER_ID_LEN];
int64_t createdTime; int64_t createdTime;
int64_t updateTime; int64_t updateTime;
......
...@@ -70,7 +70,7 @@ typedef struct { ...@@ -70,7 +70,7 @@ typedef struct {
typedef struct SMnode { typedef struct SMnode {
int32_t dnodeId; int32_t dnodeId;
int32_t clusterId; int64_t clusterId;
int8_t replica; int8_t replica;
int8_t selfIndex; int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
......
...@@ -67,7 +67,7 @@ static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) { ...@@ -67,7 +67,7 @@ static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
if (pRaw == NULL) return NULL; if (pRaw == NULL) return NULL;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pCluster->id); SDB_SET_INT64(pRaw, dataPos, pCluster->id);
SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime) SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime)
SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime) SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime)
SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN) SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN)
...@@ -91,7 +91,7 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) { ...@@ -91,7 +91,7 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
if (pCluster == NULL) return NULL; if (pCluster == NULL) return NULL;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &pCluster->id) SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->id)
SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->createdTime) SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->updateTime) SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->updateTime)
SDB_GET_BINARY(pRaw, pRow, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN) SDB_GET_BINARY(pRaw, pRow, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN)
...@@ -101,17 +101,17 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) { ...@@ -101,17 +101,17 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
} }
static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster) { static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster) {
mTrace("cluster:%d, perform insert action", pCluster->id); mTrace("cluster:%" PRId64 ", perform insert action", pCluster->id);
return 0; return 0;
} }
static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) { static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) {
mTrace("cluster:%d, perform delete action", pCluster->id); mTrace("cluster:%" PRId64 ", perform delete action", pCluster->id);
return 0; return 0;
} }
static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOldCluster, SClusterObj *pNewCluster) { static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOldCluster, SClusterObj *pNewCluster) {
mTrace("cluster:%d, perform update action", pOldCluster->id); mTrace("cluster:%" PRId64 ", perform update action", pOldCluster->id);
return 0; return 0;
} }
...@@ -125,17 +125,17 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { ...@@ -125,17 +125,17 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
strcpy(clusterObj.name, "tdengine2.0"); strcpy(clusterObj.name, "tdengine2.0");
mError("failed to get name from system, set to default val %s", clusterObj.name); mError("failed to get name from system, set to default val %s", clusterObj.name);
} else { } else {
mDebug("cluster:%d, name is %s", clusterObj.id, clusterObj.name); mDebug("cluster:%" PRId64 ", name is %s", clusterObj.id, clusterObj.name);
} }
clusterObj.id = MurmurHash3_32(clusterObj.name, TSDB_CLUSTER_ID_LEN); clusterObj.id = MurmurHash3_32(clusterObj.name, TSDB_CLUSTER_ID_LEN);
clusterObj.id = abs(clusterObj.id); clusterObj.id = (clusterObj.id >= 0 ? clusterObj.id : -clusterObj.id);
pMnode->clusterId = clusterObj.id; pMnode->clusterId = clusterObj.id;
SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj); SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj);
if (pRaw == NULL) return -1; if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("cluster:%d, will be created while deploy sdb", clusterObj.id); mDebug("cluster:%" PRId64 ", will be created while deploy sdb", clusterObj.id);
return sdbWrite(pMnode->pSdb, pRaw); return sdbWrite(pMnode->pSdb, pRaw);
} }
...@@ -143,8 +143,8 @@ static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg ...@@ -143,8 +143,8 @@ static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg
int32_t cols = 0; int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema; SSchema *pSchema = pMeta->pSchema;
pShow->bytes[cols] = 4; pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_INT; pSchema[cols].type = TSDB_DATA_TYPE_BIGINT;
strcpy(pSchema[cols].name, "id"); strcpy(pSchema[cols].name, "id");
pSchema[cols].bytes = htonl(pShow->bytes[cols]); pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++; cols++;
...@@ -192,7 +192,7 @@ static int32_t mndRetrieveClusters(SMnodeMsg *pMsg, SShowObj *pShow, char *data, ...@@ -192,7 +192,7 @@ static int32_t mndRetrieveClusters(SMnodeMsg *pMsg, SShowObj *pShow, char *data,
cols = 0; cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pCluster->id; *(int64_t *)pWrite = pCluster->id;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......
...@@ -276,7 +276,7 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) { ...@@ -276,7 +276,7 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) {
static void mndParseStatusMsg(SStatusMsg *pStatus) { static void mndParseStatusMsg(SStatusMsg *pStatus) {
pStatus->sver = htonl(pStatus->sver); pStatus->sver = htonl(pStatus->sver);
pStatus->dnodeId = htonl(pStatus->dnodeId); pStatus->dnodeId = htonl(pStatus->dnodeId);
pStatus->clusterId = htonl(pStatus->clusterId); pStatus->clusterId = htobe64(pStatus->clusterId);
pStatus->rebootTime = htobe64(pStatus->rebootTime); pStatus->rebootTime = htobe64(pStatus->rebootTime);
pStatus->updateTime = htobe64(pStatus->updateTime); pStatus->updateTime = htobe64(pStatus->updateTime);
pStatus->numOfCores = htons(pStatus->numOfCores); pStatus->numOfCores = htons(pStatus->numOfCores);
...@@ -323,13 +323,14 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) { ...@@ -323,13 +323,14 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
} }
if (pStatus->dnodeId == 0) { if (pStatus->dnodeId == 0) {
mDebug("dnode:%d %s, first access, set clusterId %d", pDnode->id, pDnode->ep, pMnode->clusterId); mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
} else { } else {
if (pStatus->clusterId != pMnode->clusterId) { if (pStatus->clusterId != pMnode->clusterId) {
if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { if (pDnode != NULL && pDnode->status != DND_STATUS_READY) {
pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH; pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
} }
mError("dnode:%d, clusterId %d not match exist %d", pDnode->id, pStatus->clusterId, pMnode->clusterId); mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId,
pMnode->clusterId);
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
terrno != TSDB_CODE_MND_INVALID_CLUSTER_ID; terrno != TSDB_CODE_MND_INVALID_CLUSTER_ID;
return -1; return -1;
...@@ -370,7 +371,7 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) { ...@@ -370,7 +371,7 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
pRsp->dnodeCfg.dnodeId = htonl(pDnode->id); pRsp->dnodeCfg.dnodeId = htonl(pDnode->id);
pRsp->dnodeCfg.dropped = 0; pRsp->dnodeCfg.dropped = 0;
pRsp->dnodeCfg.clusterId = htonl(pMnode->clusterId); pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId);
mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps); mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps);
pMsg->contLen = contLen; pMsg->contLen = contLen;
......
...@@ -225,7 +225,7 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) { ...@@ -225,7 +225,7 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
mndReleaseUser(pMnode, pUser); mndReleaseUser(pMnode, pUser);
} }
pRsp->clusterId = htonl(pMnode->clusterId); pRsp->clusterId = htobe64(pMnode->clusterId);
pRsp->connId = htonl(pConn->id); pRsp->connId = htonl(pConn->id);
mndGetMnodeEpSet(pMnode, &pRsp->epSet); mndGetMnodeEpSet(pMnode, &pRsp->epSet);
mndReleaseConn(pMnode, pConn); mndReleaseConn(pMnode, pConn);
......
...@@ -51,6 +51,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans); ...@@ -51,6 +51,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans);
static void mndTransExecute(SMnode *pMnode, STrans *pTrans); static void mndTransExecute(SMnode *pMnode, STrans *pTrans);
static void mndTransSendRpcRsp(STrans *pTrans); static void mndTransSendRpcRsp(STrans *pTrans);
static int32_t mndProcessTransMsg(SMnodeMsg *pMsg); static int32_t mndProcessTransMsg(SMnodeMsg *pMsg);
static int32_t mndProcessTransRsp(SMnodeMsg *pMsg);
int32_t mndInitTrans(SMnode *pMnode) { int32_t mndInitTrans(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_TRANS, SSdbTable table = {.sdbType = SDB_TRANS,
...@@ -62,6 +63,7 @@ int32_t mndInitTrans(SMnode *pMnode) { ...@@ -62,6 +63,7 @@ int32_t mndInitTrans(SMnode *pMnode) {
.deleteFp = (SdbDeleteFp)mndTransActionDelete}; .deleteFp = (SdbDeleteFp)mndTransActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransMsg); mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransMsg);
mndSetMsgHandle(pMnode, TDMT_MND_TRANS_RSP, mndProcessTransRsp);
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
...@@ -901,4 +903,6 @@ static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) { ...@@ -901,4 +903,6 @@ static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) {
mndTransExecute(pMnode, pTrans); mndTransExecute(pMnode, pTrans);
sdbRelease(pMnode->pSdb, pTrans); sdbRelease(pMnode->pSdb, pTrans);
} }
} }
\ No newline at end of file
static int32_t mndProcessTransRsp(SMnodeMsg *pMsg) { return 0; }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册