提交 c2323db0 编写于 作者: S Shengliang Guan

fix: allow the ip resolved by fqdn different between dnodes

上级 b05c65cd
...@@ -39,6 +39,7 @@ typedef enum { ...@@ -39,6 +39,7 @@ typedef enum {
QUEUE_MAX, QUEUE_MAX,
} EQueueType; } EQueueType;
typedef int32_t (*UpdateDnodeInfoFp)(void* pData, int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port);
typedef int32_t (*PutToQueueFp)(void* pMgmt, EQueueType qtype, SRpcMsg* pMsg); typedef int32_t (*PutToQueueFp)(void* pMgmt, EQueueType qtype, SRpcMsg* pMsg);
typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype); typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype);
typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg); typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg);
...@@ -48,6 +49,7 @@ typedef void (*ReleaseHandleFp)(SRpcHandleInfo* pHandle, int8_t type); ...@@ -48,6 +49,7 @@ typedef void (*ReleaseHandleFp)(SRpcHandleInfo* pHandle, int8_t type);
typedef void (*ReportStartup)(const char* name, const char* desc); typedef void (*ReportStartup)(const char* name, const char* desc);
typedef struct { typedef struct {
void* data;
void* mgmt; void* mgmt;
void* clientRpc; void* clientRpc;
PutToQueueFp putToQueueFp; PutToQueueFp putToQueueFp;
...@@ -57,6 +59,7 @@ typedef struct { ...@@ -57,6 +59,7 @@ typedef struct {
RegisterBrokenLinkArgFp registerBrokenLinkArgFp; RegisterBrokenLinkArgFp registerBrokenLinkArgFp;
ReleaseHandleFp releaseHandleFp; ReleaseHandleFp releaseHandleFp;
ReportStartup reportStartupFp; ReportStartup reportStartupFp;
UpdateDnodeInfoFp updateDnodeInfoFp;
} SMsgCb; } SMsgCb;
void tmsgSetDefault(const SMsgCb* msgcb); void tmsgSetDefault(const SMsgCb* msgcb);
...@@ -67,6 +70,7 @@ void tmsgSendRsp(SRpcMsg* pMsg); ...@@ -67,6 +70,7 @@ void tmsgSendRsp(SRpcMsg* pMsg);
void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg); void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg);
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type); void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type);
void tmsgReportStartup(const char* name, const char* desc); void tmsgReportStartup(const char* name, const char* desc);
int32_t tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -78,6 +78,8 @@ typedef enum { ...@@ -78,6 +78,8 @@ typedef enum {
} ESyncState; } ESyncState;
typedef struct SNodeInfo { typedef struct SNodeInfo {
int64_t clusterId;
int32_t nodeId;
uint16_t nodePort; uint16_t nodePort;
char nodeFqdn[TSDB_FQDN_LEN]; char nodeFqdn[TSDB_FQDN_LEN];
} SNodeInfo; } SNodeInfo;
......
...@@ -180,6 +180,6 @@ int32_t mmWriteFile(const char *path, const SMnodeOpt *pOption) { ...@@ -180,6 +180,6 @@ int32_t mmWriteFile(const char *path, const SMnodeOpt *pOption) {
return -1; return -1;
} }
dDebug("successed to write %s, deployed:%d", realfile, pOption->deploy); dDebug("succeed to write %s, deployed:%d", realfile, pOption->deploy);
return 0; return 0;
} }
...@@ -213,6 +213,6 @@ _OVER: ...@@ -213,6 +213,6 @@ _OVER:
if (code != 0) return -1; if (code != 0) return -1;
dDebug("successed to write %s, numOfVnodes:%d", realfile, numOfVnodes); dDebug("succeed to write %s, numOfVnodes:%d", realfile, numOfVnodes);
return taosRenameFile(file, realfile); return taosRenameFile(file, realfile);
} }
\ No newline at end of file
...@@ -134,8 +134,10 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { ...@@ -134,8 +134,10 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo)); memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
for (int i = 0; i < pCreate->replica; ++i) { for (int i = 0; i < pCreate->replica; ++i) {
SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i]; SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
pNode->nodeId = pCreate->replicas[i].id;
pNode->nodePort = pCreate->replicas[i].port; pNode->nodePort = pCreate->replicas[i].port;
tstrncpy(pNode->nodeFqdn, pCreate->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); tstrncpy(pNode->nodeFqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN);
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
} }
} }
...@@ -188,8 +190,8 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -188,8 +190,8 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix,
req.hashSuffix, req.replica, req.selfIndex, req.strict); req.hashSuffix, req.replica, req.selfIndex, req.strict);
for (int32_t i = 0; i < req.replica; ++i) { for (int32_t i = 0; i < req.replica; ++i) {
dInfo("vgId:%d, replica:%d id:%d fqdn:%s port:%u", req.vgId, i, req.replicas[i].id, req.replicas[i].fqdn, dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
req.replicas[i].port); req.replicas[i].id);
} }
SReplica *pReplica = &req.replicas[req.selfIndex]; SReplica *pReplica = &req.replicas[req.selfIndex];
......
...@@ -345,6 +345,8 @@ SMsgCb dmGetMsgcb(SDnode *pDnode) { ...@@ -345,6 +345,8 @@ SMsgCb dmGetMsgcb(SDnode *pDnode) {
.registerBrokenLinkArgFp = dmRegisterBrokenLinkArg, .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
.releaseHandleFp = dmReleaseHandle, .releaseHandleFp = dmReleaseHandle,
.reportStartupFp = dmReportStartup, .reportStartupFp = dmReportStartup,
.updateDnodeInfoFp = dmUpdateDnodeInfo,
.data = &pDnode->data,
}; };
return msgCb; return msgCb;
} }
...@@ -167,6 +167,7 @@ void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps); ...@@ -167,6 +167,7 @@ void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps);
void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet); void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet); void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet);
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet); void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
int32_t dmUpdateDnodeInfo(void *pData, int32_t *dnodeId, int64_t *clusterId, char *fqdn, uint16_t *port);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -182,22 +182,25 @@ _OVER: ...@@ -182,22 +182,25 @@ _OVER:
} }
int32_t dmWriteEps(SDnodeData *pData) { int32_t dmWriteEps(SDnodeData *pData) {
int32_t code = -1;
char *content = NULL;
TdFilePtr pFile = NULL;
char file[PATH_MAX] = {0}; char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0}; char realfile[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%sdnode%sdnode.json.bak", tsDataDir, TD_DIRSEP, TD_DIRSEP); snprintf(file, sizeof(file), "%s%sdnode%sdnode.json.bak", tsDataDir, TD_DIRSEP, TD_DIRSEP);
snprintf(realfile, sizeof(realfile), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP); snprintf(realfile, sizeof(realfile), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) { if (pFile == NULL) {
dError("failed to write %s since %s", file, strerror(errno)); dError("failed to open %s since %s", file, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; goto _OVER;
} }
int32_t len = 0; int32_t len = 0;
int32_t maxLen = 256 * 1024; int32_t maxLen = 256 * 1024;
char *content = taosMemoryCalloc(1, maxLen + 1); content = taosMemoryCalloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pData->dnodeId); len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pData->dnodeId);
...@@ -221,20 +224,39 @@ int32_t dmWriteEps(SDnodeData *pData) { ...@@ -221,20 +224,39 @@ int32_t dmWriteEps(SDnodeData *pData) {
} }
len += snprintf(content + len, maxLen - len, "}\n"); len += snprintf(content + len, maxLen - len, "}\n");
taosWriteFile(pFile, content, len); if (taosWriteFile(pFile, content, len) != len) {
taosFsyncFile(pFile); dError("failed to write %s since %s", file, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _OVER;
}
if (taosFsyncFile(pFile) < 0) {
dError("failed to fsync %s since %s", file, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _OVER;
}
taosCloseFile(&pFile); taosCloseFile(&pFile);
taosMemoryFree(content); taosMemoryFreeClear(content);
if (taosRenameFile(file, realfile) != 0) { if (taosRenameFile(file, realfile) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to rename %s since %s", file, terrstr()); dError("failed to rename %s since %s", file, terrstr());
return -1; goto _OVER;
} }
code = 0;
pData->updateTime = taosGetTimestampMs(); pData->updateTime = taosGetTimestampMs();
dDebug("successed to write %s, dnodeVer:%" PRId64, realfile, pData->dnodeVer); dInfo("succeed to write %s, dnodeVer:%" PRId64, realfile, pData->dnodeVer);
return 0;
_OVER:
if (content != NULL) taosMemoryFreeClear(content);
if (pFile != NULL) taosCloseFile(&pFile);
if (code != 0) {
dError("failed to write file %s since %s", realfile, terrstr());
}
return code;
} }
void dmUpdateEps(SDnodeData *pData, SArray *eps) { void dmUpdateEps(SDnodeData *pData, SArray *eps) {
...@@ -332,3 +354,41 @@ void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) { ...@@ -332,3 +354,41 @@ void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
} }
} }
int32_t dmUpdateDnodeInfo(void *data, int32_t *dnodeId, int64_t *clusterId, char *fqdn, uint16_t *port) {
SDnodeData *pData = data;
int32_t ret = -1;
taosThreadRwlockRdlock(&pData->lock);
if (*dnodeId <= 0) {
for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pData->dnodeEps); ++i) {
SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, i);
if (strcmp(pDnodeEp->ep.fqdn, fqdn) == 0 && pDnodeEp->ep.port == *port) {
dInfo("dnode:%s:%u, update dnodeId from %d to %d", fqdn, port, *dnodeId, pDnodeEp->id);
*dnodeId = pDnodeEp->id;
*clusterId = pData->clusterId;
ret = 0;
}
}
if (ret != 0) {
dInfo("dnode:%s:%u, failed to update dnodeId:%d", fqdn, port, *dnodeId);
}
} else {
SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, dnodeId, sizeof(int32_t));
if (pDnodeEp) {
if (strcmp(pDnodeEp->ep.fqdn, fqdn) != 0) {
dInfo("dnode:%d, update port from %s to %s", *dnodeId, fqdn, pDnodeEp->ep.fqdn);
tstrncpy(fqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
}
if (pDnodeEp->ep.port != *port) {
dInfo("dnode:%d, update port from %u to %u", *dnodeId, *port, pDnodeEp->ep.port);
*port = pDnodeEp->ep.port;
}
*clusterId = pData->clusterId;
ret = 0;
} else {
dInfo("dnode:%d, failed to update dnode info", *dnodeId);
}
}
taosThreadRwlockUnlock(&pData->lock);
return ret;
}
\ No newline at end of file
...@@ -105,7 +105,7 @@ int32_t dmWriteFile(const char *path, const char *name, bool deployed) { ...@@ -105,7 +105,7 @@ int32_t dmWriteFile(const char *path, const char *name, bool deployed) {
return -1; return -1;
} }
dInfo("successed to write %s, deployed:%d", realfile, deployed); dInfo("succeed to write %s, deployed:%d", realfile, deployed);
code = 0; code = 0;
_OVER: _OVER:
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "mndSync.h" #include "mndSync.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "tmisce.h" #include "tmisce.h"
#include "mndCluster.h"
#define MNODE_VER_NUMBER 1 #define MNODE_VER_NUMBER 1
#define MNODE_RESERVE_SIZE 64 #define MNODE_RESERVE_SIZE 64
...@@ -743,8 +744,12 @@ static void mndReloadSyncConfig(SMnode *pMnode) { ...@@ -743,8 +744,12 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
if (objStatus == SDB_STATUS_READY || objStatus == SDB_STATUS_CREATING) { if (objStatus == SDB_STATUS_READY || objStatus == SDB_STATUS_CREATING) {
SNodeInfo *pNode = &cfg.nodeInfo[cfg.replicaNum]; SNodeInfo *pNode = &cfg.nodeInfo[cfg.replicaNum];
tstrncpy(pNode->nodeFqdn, pObj->pDnode->fqdn, sizeof(pNode->nodeFqdn)); pNode->nodeId = pObj->pDnode->id;
pNode->clusterId = mndGetClusterId(pMnode);
pNode->nodePort = pObj->pDnode->port; pNode->nodePort = pObj->pDnode->port;
tstrncpy(pNode->nodeFqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN);
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
mInfo("vgId:1, ep:%s:%u dnode:%d", pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
if (pObj->pDnode->id == pMnode->selfDnodeId) { if (pObj->pDnode->id == pMnode->selfDnodeId) {
cfg.myIndex = cfg.replicaNum; cfg.myIndex = cfg.replicaNum;
} }
...@@ -775,7 +780,8 @@ static void mndReloadSyncConfig(SMnode *pMnode) { ...@@ -775,7 +780,8 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
mInfo("vgId:1, mnode sync reconfig, replica:%d myIndex:%d", cfg.replicaNum, cfg.myIndex); mInfo("vgId:1, mnode sync reconfig, replica:%d myIndex:%d", cfg.replicaNum, cfg.myIndex);
for (int32_t i = 0; i < cfg.replicaNum; ++i) { for (int32_t i = 0; i < cfg.replicaNum; ++i) {
SNodeInfo *pNode = &cfg.nodeInfo[i]; SNodeInfo *pNode = &cfg.nodeInfo[i];
mInfo("vgId:1, index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort); mInfo("vgId:1, index:%d, ep:%s:%u dnode:%d cluster:%" PRId64, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId,
pNode->clusterId);
} }
int32_t code = syncReconfig(pMnode->syncMgmt.sync, &cfg); int32_t code = syncReconfig(pMnode->syncMgmt.sync, &cfg);
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndSync.h" #include "mndSync.h"
#include "mndCluster.h"
#include "mndTrans.h" #include "mndTrans.h"
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
...@@ -297,9 +298,12 @@ int32_t mndInitSync(SMnode *pMnode) { ...@@ -297,9 +298,12 @@ int32_t mndInitSync(SMnode *pMnode) {
pCfg->myIndex = pMgmt->selfIndex; pCfg->myIndex = pMgmt->selfIndex;
for (int32_t i = 0; i < pMgmt->numOfReplicas; ++i) { for (int32_t i = 0; i < pMgmt->numOfReplicas; ++i) {
SNodeInfo *pNode = &pCfg->nodeInfo[i]; SNodeInfo *pNode = &pCfg->nodeInfo[i];
tstrncpy(pNode->nodeFqdn, pMgmt->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); pNode->nodeId = pMgmt->replicas[i].id;
pNode->nodePort = pMgmt->replicas[i].port; pNode->nodePort = pMgmt->replicas[i].port;
mInfo("vgId:1, index:%d ep:%s:%u", i, pNode->nodeFqdn, pNode->nodePort); tstrncpy(pNode->nodeFqdn, pMgmt->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
mInfo("vgId:1, index:%d ep:%s:%u dnode:%d cluster:" PRId64, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId,
pNode->clusterId);
} }
tsem_init(&pMgmt->syncSem, 0, 0); tsem_init(&pMgmt->syncSem, 0, 0);
......
...@@ -125,13 +125,17 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) { ...@@ -125,13 +125,17 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
if (tjsonAddIntegerToObject(pJson, "vndStats.timeseries", pCfg->vndStats.numOfTimeSeries) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "vndStats.timeseries", pCfg->vndStats.numOfTimeSeries) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "vndStats.ntimeseries", pCfg->vndStats.numOfNTimeSeries) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "vndStats.ntimeseries", pCfg->vndStats.numOfNTimeSeries) < 0) return -1;
SJson *pNodeInfoArr = tjsonCreateArray(); SJson *nodeInfo = tjsonCreateArray();
tjsonAddItemToObject(pJson, "syncCfg.nodeInfo", pNodeInfoArr); if (nodeInfo == NULL) return -1;
if (tjsonAddItemToObject(pJson, "syncCfg.nodeInfo", nodeInfo) < 0) return -1;
for (int i = 0; i < pCfg->syncCfg.replicaNum; ++i) { for (int i = 0; i < pCfg->syncCfg.replicaNum; ++i) {
SJson *pNodeInfo = tjsonCreateObject(); SJson *info = tjsonCreateObject();
tjsonAddIntegerToObject(pNodeInfo, "nodePort", (pCfg->syncCfg.nodeInfo)[i].nodePort); if (info == NULL) return -1;
tjsonAddStringToObject(pNodeInfo, "nodeFqdn", (pCfg->syncCfg.nodeInfo)[i].nodeFqdn); if (tjsonAddIntegerToObject(info, "nodePort", pCfg->syncCfg.nodeInfo[i].nodePort) < 0) return -1;
tjsonAddItemToArray(pNodeInfoArr, pNodeInfo); if (tjsonAddStringToObject(info, "nodeFqdn", pCfg->syncCfg.nodeInfo[i].nodeFqdn) < 0) return -1;
if (tjsonAddIntegerToObject(info, "nodeId", pCfg->syncCfg.nodeInfo[i].nodeId) < 0) return -1;
if (tjsonAddIntegerToObject(info, "clusterId", pCfg->syncCfg.nodeInfo[i].clusterId) < 0) return -1;
if (tjsonAddItemToArray(nodeInfo, info) < 0) return -1;
} }
return 0; return 0;
...@@ -240,15 +244,19 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { ...@@ -240,15 +244,19 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
tjsonGetNumberValue(pJson, "vndStats.ntimeseries", pCfg->vndStats.numOfNTimeSeries, code); tjsonGetNumberValue(pJson, "vndStats.ntimeseries", pCfg->vndStats.numOfNTimeSeries, code);
if (code < 0) return -1; if (code < 0) return -1;
SJson *pNodeInfoArr = tjsonGetObjectItem(pJson, "syncCfg.nodeInfo"); SJson *nodeInfo = tjsonGetObjectItem(pJson, "syncCfg.nodeInfo");
int arraySize = tjsonGetArraySize(pNodeInfoArr); int arraySize = tjsonGetArraySize(nodeInfo);
assert(arraySize == pCfg->syncCfg.replicaNum); if (arraySize != pCfg->syncCfg.replicaNum) return -1;
for (int i = 0; i < arraySize; ++i) { for (int i = 0; i < arraySize; ++i) {
SJson *pNodeInfo = tjsonGetArrayItem(pNodeInfoArr, i); SJson *info = tjsonGetArrayItem(nodeInfo, i);
assert(pNodeInfo != NULL); if (info == NULL) return -1;
tjsonGetNumberValue(pNodeInfo, "nodePort", (pCfg->syncCfg.nodeInfo)[i].nodePort, code); tjsonGetNumberValue(info, "nodePort", pCfg->syncCfg.nodeInfo[i].nodePort, code);
tjsonGetStringValue(pNodeInfo, "nodeFqdn", (pCfg->syncCfg.nodeInfo)[i].nodeFqdn); if (code < 0) return -1;
tjsonGetStringValue(info, "nodeFqdn", pCfg->syncCfg.nodeInfo[i].nodeFqdn);
if (code < 0) return -1;
tjsonGetNumberValue(info, "nodeId", pCfg->syncCfg.nodeInfo[i].nodeId, code);
tjsonGetNumberValue(info, "clusterId", pCfg->syncCfg.nodeInfo[i].clusterId, code);
} }
tjsonGetNumberValue(pJson, "tsdbPageSize", pCfg->tsdbPageSize, code); tjsonGetNumberValue(pJson, "tsdbPageSize", pCfg->tsdbPageSize, code);
......
...@@ -82,8 +82,10 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) { ...@@ -82,8 +82,10 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) {
vInfo("vgId:%d, save config, replicas:%d selfIndex:%d", pReq->vgId, pCfg->replicaNum, pCfg->myIndex); vInfo("vgId:%d, save config, replicas:%d selfIndex:%d", pReq->vgId, pCfg->replicaNum, pCfg->myIndex);
for (int i = 0; i < pReq->replica; ++i) { for (int i = 0; i < pReq->replica; ++i) {
SNodeInfo *pNode = &pCfg->nodeInfo[i]; SNodeInfo *pNode = &pCfg->nodeInfo[i];
pNode->nodeId = pReq->replicas[i].id;
pNode->nodePort = pReq->replicas[i].port; pNode->nodePort = pReq->replicas[i].port;
tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
vInfo("vgId:%d, save config, replica:%d ep:%s:%u", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort); vInfo("vgId:%d, save config, replica:%d ep:%s:%u", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort);
} }
......
...@@ -578,7 +578,8 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { ...@@ -578,7 +578,8 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex); vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex);
for (int32_t i = 0; i < pCfg->replicaNum; ++i) { for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
SNodeInfo *pNode = &pCfg->nodeInfo[i]; SNodeInfo *pNode = &pCfg->nodeInfo[i];
vInfo("vgId:%d, index:%d ep:%s:%u", pVnode->config.vgId, i, pNode->nodeFqdn, pNode->nodePort); vInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pVnode->config.vgId, i, pNode->nodeFqdn, pNode->nodePort,
pNode->nodeId, pNode->clusterId);
} }
pVnode->sync = syncOpen(&syncInfo); pVnode->sync = syncOpen(&syncInfo);
......
...@@ -53,6 +53,18 @@ typedef struct SyncPreSnapshot SyncPreSnapshot; ...@@ -53,6 +53,18 @@ typedef struct SyncPreSnapshot SyncPreSnapshot;
typedef struct SSyncLogBuffer SSyncLogBuffer; typedef struct SSyncLogBuffer SSyncLogBuffer;
typedef struct SSyncLogReplMgr SSyncLogReplMgr; typedef struct SSyncLogReplMgr SSyncLogReplMgr;
#define MAX_CONFIG_INDEX_COUNT 256
typedef struct SRaftCfg {
SSyncCfg cfg;
int32_t batchSize;
int8_t isStandBy;
int8_t snapshotStrategy;
SyncIndex lastConfigIndex;
int32_t configIndexCount;
SyncIndex configIndexArr[MAX_CONFIG_INDEX_COUNT];
} SRaftCfg;
typedef struct SRaftId { typedef struct SRaftId {
SyncNodeId addr; SyncNodeId addr;
SyncGroupId vgId; SyncGroupId vgId;
...@@ -93,7 +105,7 @@ typedef struct SPeerState { ...@@ -93,7 +105,7 @@ typedef struct SPeerState {
typedef struct SSyncNode { typedef struct SSyncNode {
// init by SSyncInfo // init by SSyncInfo
SyncGroupId vgId; SyncGroupId vgId;
SRaftCfg* pRaftCfg; SRaftCfg raftCfg;
char path[TSDB_FILENAME_LEN]; char path[TSDB_FILENAME_LEN];
char raftStorePath[TSDB_FILENAME_LEN * 2]; char raftStorePath[TSDB_FILENAME_LEN * 2];
char configPath[TSDB_FILENAME_LEN * 2]; char configPath[TSDB_FILENAME_LEN * 2];
...@@ -112,6 +124,7 @@ typedef struct SSyncNode { ...@@ -112,6 +124,7 @@ typedef struct SSyncNode {
int32_t peersNum; int32_t peersNum;
SNodeInfo peersNodeInfo[TSDB_MAX_REPLICA]; SNodeInfo peersNodeInfo[TSDB_MAX_REPLICA];
SEpSet peersEpset[TSDB_MAX_REPLICA];
SRaftId peersId[TSDB_MAX_REPLICA]; SRaftId peersId[TSDB_MAX_REPLICA];
int32_t replicaNum; int32_t replicaNum;
...@@ -245,7 +258,6 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode); ...@@ -245,7 +258,6 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode);
// utils -------------- // utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode); SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode);
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h); int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h);
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode); bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode);
......
...@@ -22,64 +22,9 @@ extern "C" { ...@@ -22,64 +22,9 @@ extern "C" {
#include "syncInt.h" #include "syncInt.h"
#define CONFIG_FILE_LEN 2048 int32_t syncWriteCfgFile(SSyncNode *pNode);
#define MAX_CONFIG_INDEX_COUNT 256 int32_t syncReadCfgFile(SSyncNode *pNode);
int32_t syncAddCfgIndex(SSyncNode *pNode, SyncIndex cfgIndex);
typedef struct SRaftCfgIndex {
TdFilePtr pFile;
char path[TSDB_FILENAME_LEN * 2];
SyncIndex configIndexArr[MAX_CONFIG_INDEX_COUNT];
int32_t configIndexCount;
} SRaftCfgIndex;
SRaftCfgIndex *raftCfgIndexOpen(const char *path);
int32_t raftCfgIndexClose(SRaftCfgIndex *pRaftCfgIndex);
int32_t raftCfgIndexPersist(SRaftCfgIndex *pRaftCfgIndex);
int32_t raftCfgIndexAddConfigIndex(SRaftCfgIndex *pRaftCfgIndex, SyncIndex configIndex);
cJSON *raftCfgIndex2Json(SRaftCfgIndex *pRaftCfgIndex);
char *raftCfgIndex2Str(SRaftCfgIndex *pRaftCfgIndex);
int32_t raftCfgIndexFromJson(const cJSON *pRoot, SRaftCfgIndex *pRaftCfgIndex);
int32_t raftCfgIndexFromStr(const char *s, SRaftCfgIndex *pRaftCfgIndex);
int32_t raftCfgIndexCreateFile(const char *path);
typedef struct SRaftCfg {
SSyncCfg cfg;
TdFilePtr pFile;
char path[TSDB_FILENAME_LEN * 2];
int8_t isStandBy;
int32_t batchSize;
int8_t snapshotStrategy;
SyncIndex lastConfigIndex;
SyncIndex configIndexArr[MAX_CONFIG_INDEX_COUNT];
int32_t configIndexCount;
} SRaftCfg;
SRaftCfg *raftCfgOpen(const char *path);
int32_t raftCfgClose(SRaftCfg *pRaftCfg);
int32_t raftCfgPersist(SRaftCfg *pRaftCfg);
int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex);
void syncCfg2SimpleStr(const SSyncCfg *pCfg, char *str, int32_t bufLen);
cJSON *syncCfg2Json(SSyncCfg *pSyncCfg);
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg);
cJSON *raftCfg2Json(SRaftCfg *pRaftCfg);
char *raftCfg2Str(SRaftCfg *pRaftCfg);
int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg);
int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg);
typedef struct SRaftCfgMeta {
int8_t isStandBy;
int32_t batchSize;
int8_t snapshotStrategy;
SyncIndex lastConfigIndex;
} SRaftCfgMeta;
int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -62,22 +62,19 @@ extern "C" { ...@@ -62,22 +62,19 @@ extern "C" {
// clang-format on // clang-format on
uint64_t syncUtilAddr2U64(const char* host, uint16_t port); #define CID(pRaftId) (int32_t)(((pRaftId)->addr) >> 32)
void syncUtilU642Addr(uint64_t u64, char* host, int64_t len, uint16_t* port); #define DID(pRaftId) (int32_t)((pRaftId)->addr)
void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet); #define SYNC_ADDR(pInfo) (int64_t)(((pInfo)->clusterId << 32) | (pInfo)->nodeId)
void syncUtilRaftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet);
bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId); void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet);
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2); bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId);
bool syncUtilEmptyId(const SRaftId* pId); bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2);
bool syncUtilEmptyId(const SRaftId* pId);
int32_t syncUtilElectRandomMS(int32_t min, int32_t max); int32_t syncUtilElectRandomMS(int32_t min, int32_t max);
int32_t syncUtilQuorum(int32_t replicaNum); int32_t syncUtilQuorum(int32_t replicaNum);
cJSON* syncUtilRaftId2Json(const SRaftId* p);
const char* syncStr(ESyncState state); const char* syncStr(ESyncState state);
char* syncUtilPrintBin(char* ptr, uint32_t len);
char* syncUtilPrintBin2(char* ptr, uint32_t len);
void syncUtilMsgHtoN(void* msg); void syncUtilMsgHtoN(void* msg);
void syncUtilMsgNtoH(void* msg);
bool syncUtilUserPreCommit(tmsg_t msgType); bool syncUtilUserPreCommit(tmsg_t msgType);
bool syncUtilUserRollback(tmsg_t msgType); bool syncUtilUserRollback(tmsg_t msgType);
......
...@@ -94,7 +94,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { ...@@ -94,7 +94,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode); voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode); votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum); pSyncNode->quorum = syncUtilQuorum(pSyncNode->raftCfg.cfg.replicaNum);
syncNodeCandidate2Leader(pSyncNode); syncNodeCandidate2Leader(pSyncNode);
pSyncNode->pVotesGranted->toLeader = true; pSyncNode->pVotesGranted->toLeader = true;
......
...@@ -115,7 +115,7 @@ void syncHbTimerDataRemove(int64_t rid) { taosRemoveRef(gHbDataRefId, rid); } ...@@ -115,7 +115,7 @@ void syncHbTimerDataRemove(int64_t rid) { taosRemoveRef(gHbDataRefId, rid); }
SSyncHbTimerData *syncHbTimerDataAcquire(int64_t rid) { SSyncHbTimerData *syncHbTimerDataAcquire(int64_t rid) {
SSyncHbTimerData *pData = taosAcquireRef(gHbDataRefId, rid); SSyncHbTimerData *pData = taosAcquireRef(gHbDataRefId, rid);
if (pData == NULL) { if (pData == NULL) {
sError("failed to acquire hb-timer-data from refId:%" PRId64, rid); sInfo("failed to acquire hb-timer-data from refId:%" PRId64, rid);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
} }
......
...@@ -64,10 +64,8 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, Sync ...@@ -64,10 +64,8 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, Sync
} }
} }
char host[128]; sError("vgId:%d, indexmgr set index:%" PRId64 " for dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, index,
uint16_t port; DID(pRaftId), CID(pRaftId));
syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
sError("vgId:%d, indexmgr set index:%" PRId64 " for %s:%d failed", pIndexMgr->pNode->vgId, index, host, port);
} }
SSyncLogReplMgr *syncNodeGetLogReplMgr(SSyncNode *pNode, SRaftId *pRaftId) { SSyncLogReplMgr *syncNodeGetLogReplMgr(SSyncNode *pNode, SRaftId *pRaftId) {
...@@ -77,10 +75,7 @@ SSyncLogReplMgr *syncNodeGetLogReplMgr(SSyncNode *pNode, SRaftId *pRaftId) { ...@@ -77,10 +75,7 @@ SSyncLogReplMgr *syncNodeGetLogReplMgr(SSyncNode *pNode, SRaftId *pRaftId) {
} }
} }
char host[128]; sError("vgId:%d, indexmgr get replmgr from dnode:%d cluster:%d failed", pNode->vgId, DID(pRaftId), CID(pRaftId));
uint16_t port;
syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
sError("vgId:%d, indexmgr get replmgr from %s:%d failed", pNode->vgId, host, port);
return NULL; return NULL;
} }
...@@ -92,10 +87,8 @@ SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) ...@@ -92,10 +87,8 @@ SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId)
} }
} }
char host[128]; sError("vgId:%d, indexmgr get index from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
uint16_t port; CID(pRaftId));
syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
sError("vgId:%d, indexmgr get index from %s:%d failed", pIndexMgr->pNode->vgId, host, port);
return SYNC_INDEX_INVALID; return SYNC_INDEX_INVALID;
} }
...@@ -107,11 +100,8 @@ void syncIndexMgrSetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, ...@@ -107,11 +100,8 @@ void syncIndexMgrSetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId,
} }
} }
char host[128]; sError("vgId:%d, indexmgr set start-time:%" PRId64 " for dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId,
uint16_t port; startTime, DID(pRaftId), CID(pRaftId));
syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
sError("vgId:%d, indexmgr set start-time:%" PRId64 " for %s:%d failed", pIndexMgr->pNode->vgId, startTime, host,
port);
} }
int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) { int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) {
...@@ -122,10 +112,8 @@ int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftI ...@@ -122,10 +112,8 @@ int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftI
} }
} }
char host[128]; sError("vgId:%d, indexmgr get start-time from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
uint16_t port; CID(pRaftId));
syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
sError("vgId:%d, indexmgr get start-time from %s:%d failed", pIndexMgr->pNode->vgId, host, port);
return -1; return -1;
} }
...@@ -137,10 +125,8 @@ void syncIndexMgrSetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, i ...@@ -137,10 +125,8 @@ void syncIndexMgrSetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, i
} }
} }
char host[128]; sError("vgId:%d, indexmgr set recv-time:%" PRId64 " for dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, recvTime,
uint16_t port; DID(pRaftId), CID(pRaftId));
syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
sError("vgId:%d, indexmgr set recv-time:%" PRId64 " for %s:%d failed", pIndexMgr->pNode->vgId, recvTime, host, port);
} }
int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) { int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) {
...@@ -151,10 +137,8 @@ int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId ...@@ -151,10 +137,8 @@ int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId
} }
} }
char host[128]; sError("vgId:%d, indexmgr get recv-time from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
uint16_t port; CID(pRaftId));
syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
sError("vgId:%d, indexmgr get recv-time from %s:%d failed", pIndexMgr->pNode->vgId, host, port);
return -1; return -1;
} }
...@@ -166,10 +150,8 @@ void syncIndexMgrSetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, SyncT ...@@ -166,10 +150,8 @@ void syncIndexMgrSetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, SyncT
} }
} }
char host[128]; sError("vgId:%d, indexmgr set term:%" PRId64 " for dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, term,
uint16_t port; DID(pRaftId), CID(pRaftId));
syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
sError("vgId:%d, indexmgr set term:%" PRId64 " for %s:%d failed", pIndexMgr->pNode->vgId, term, host, port);
} }
SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) { SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) {
...@@ -180,9 +162,7 @@ SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) { ...@@ -180,9 +162,7 @@ SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) {
} }
} }
char host[128]; sError("vgId:%d, indexmgr get term from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
uint16_t port; CID(pRaftId));
syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
sError("vgId:%d, indexmgr get term from %s:%d failed", pIndexMgr->pNode->vgId, host, port);
return -1; return -1;
} }
此差异已折叠。
...@@ -652,18 +652,15 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod ...@@ -652,18 +652,15 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
SRaftId destId = pMsg->srcId; SRaftId destId = pMsg->srcId;
ASSERT(pMgr->restored == false); ASSERT(pMgr->restored == false);
char host[64];
uint16_t port;
syncUtilU642Addr(destId.addr, host, sizeof(host), &port);
if (pMgr->endIndex == 0) { if (pMgr->endIndex == 0) {
ASSERT(pMgr->startIndex == 0); ASSERT(pMgr->startIndex == 0);
ASSERT(pMgr->matchIndex == 0); ASSERT(pMgr->matchIndex == 0);
if (pMsg->matchIndex < 0) { if (pMsg->matchIndex < 0) {
pMgr->restored = true; pMgr->restored = true;
sInfo("vgId:%d, sync log repl mgr restored. peer: %s:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64 sInfo("vgId:%d, sync log repl mgr restored. peer: dnode:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64
", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, host, port, destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pNode->vgId, DID(&destId), destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
return 0; return 0;
} }
...@@ -678,21 +675,21 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod ...@@ -678,21 +675,21 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) { if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) {
pMgr->matchIndex = pMsg->matchIndex; pMgr->matchIndex = pMsg->matchIndex;
pMgr->restored = true; pMgr->restored = true;
sInfo("vgId:%d, sync log repl mgr restored. peer: %s:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64 sInfo("vgId:%d, sync log repl mgr restored. peer: dnode:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64
", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, host, port, destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pNode->vgId, DID(&destId), destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
return 0; return 0;
} }
if (pMsg->success == false && pMsg->matchIndex >= pMsg->lastSendIndex) { if (pMsg->success == false && pMsg->matchIndex >= pMsg->lastSendIndex) {
sWarn("vgId:%d, failed to rollback match index. peer: %s:%d, match index: %" PRId64 ", last sent: %" PRId64, sWarn("vgId:%d, failed to rollback match index. peer: dnode:%d, match index: %" PRId64 ", last sent: %" PRId64,
pNode->vgId, host, port, pMsg->matchIndex, pMsg->lastSendIndex); pNode->vgId, DID(&destId), pMsg->matchIndex, pMsg->lastSendIndex);
if (syncNodeStartSnapshot(pNode, &destId) < 0) { if (syncNodeStartSnapshot(pNode, &destId) < 0) {
sError("vgId:%d, failed to start snapshot for peer %s:%d", pNode->vgId, host, port); sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
return -1; return -1;
} }
sInfo("vgId:%d, snapshot replication to peer %s:%d", pNode->vgId, host, port); sInfo("vgId:%d, snapshot replication to peer dnode:%d", pNode->vgId, DID(&destId));
return 0; return 0;
} }
} }
...@@ -707,10 +704,10 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod ...@@ -707,10 +704,10 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
if (term < 0 || (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) { if (term < 0 || (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST); ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST);
if (syncNodeStartSnapshot(pNode, &destId) < 0) { if (syncNodeStartSnapshot(pNode, &destId) < 0) {
sError("vgId:%d, failed to start snapshot for peer %s:%d", pNode->vgId, host, port); sError("vgId:%d, failed to start snapshot for peer %s:%d", pNode->vgId, DID(&destId));
return -1; return -1;
} }
sInfo("vgId:%d, snapshot replication to peer %s:%d", pNode->vgId, host, port); sInfo("vgId:%d, snapshot replication to peer %s:%d", pNode->vgId, DID(&destId));
return 0; return 0;
} }
......
此差异已折叠。
...@@ -112,13 +112,6 @@ int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { ...@@ -112,13 +112,6 @@ int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) {
cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId); cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId);
uint64_t u64 = pRaftStore->voteFor.addr;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pRoot, "addr_host", host);
cJSON_AddNumberToObject(pRoot, "addr_port", port);
char *serialized = cJSON_Print(pRoot); char *serialized = cJSON_Print(pRoot);
int len2 = strlen(serialized); int len2 = strlen(serialized);
ASSERT(len2 < len); ASSERT(len2 < len);
......
...@@ -107,10 +107,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh ...@@ -107,10 +107,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh
pMsg = rpcMsg.pCont; pMsg = rpcMsg.pCont;
} else { } else {
char host[64]; sNError(pSyncNode, "replicate to dnode:%d error, next-index:%" PRId64, DID(pDestId), nextIndex);
uint16_t port;
syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port);
sNError(pSyncNode, "replicate to %s:%d error, next-index:%" PRId64, host, port, nextIndex);
return -1; return -1;
} }
} }
...@@ -171,10 +168,7 @@ int32_t syncNodeReplicateOld(SSyncNode* pSyncNode) { ...@@ -171,10 +168,7 @@ int32_t syncNodeReplicateOld(SSyncNode* pSyncNode) {
SRaftId* pDestId = &(pSyncNode->peersId[i]); SRaftId* pDestId = &(pSyncNode->peersId[i]);
ret = syncNodeReplicateOne(pSyncNode, pDestId, true); ret = syncNodeReplicateOne(pSyncNode, pDestId, true);
if (ret != 0) { if (ret != 0) {
char host[64]; sError("vgId:%d, do append entries error for %s:%d", pSyncNode->vgId, DID(pDestId));
int16_t port;
syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port);
sError("vgId:%d, do append entries error for %s:%d", pSyncNode->vgId, host, port);
} }
} }
...@@ -183,7 +177,6 @@ int32_t syncNodeReplicateOld(SSyncNode* pSyncNode) { ...@@ -183,7 +177,6 @@ int32_t syncNodeReplicateOld(SSyncNode* pSyncNode) {
int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) { int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) {
SyncAppendEntries* pMsg = pRpcMsg->pCont; SyncAppendEntries* pMsg = pRpcMsg->pCont;
int32_t ret = 0;
pMsg->destId = *destRaftId; pMsg->destId = *destRaftId;
syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg); syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg);
return 0; return 0;
...@@ -229,11 +222,7 @@ int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* dest ...@@ -229,11 +222,7 @@ int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* dest
if (syncNodeNeedSendAppendEntries(pSyncNode, destRaftId, pMsg)) { if (syncNodeNeedSendAppendEntries(pSyncNode, destRaftId, pMsg)) {
ret = syncNodeSendAppendEntries(pSyncNode, destRaftId, pRpcMsg); ret = syncNodeSendAppendEntries(pSyncNode, destRaftId, pRpcMsg);
} else { } else {
char logBuf[128]; sNTrace(pSyncNode, "do not repcate to dnode:%d for index:%" PRId64, DID(destRaftId), pMsg->prevLogIndex + 1);
char host[64];
int16_t port;
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "do not repcate to %s:%d for index:%" PRId64, host, port, pMsg->prevLogIndex + 1);
rpcFreeCont(pRpcMsg->pCont); rpcFreeCont(pRpcMsg->pCont);
} }
......
...@@ -24,33 +24,35 @@ ...@@ -24,33 +24,35 @@
#include "syncUtil.h" #include "syncUtil.h"
static void syncNodeCleanConfigIndex(SSyncNode* ths) { static void syncNodeCleanConfigIndex(SSyncNode* ths) {
#if 0
int32_t newArrIndex = 0; int32_t newArrIndex = 0;
SyncIndex newConfigIndexArr[MAX_CONFIG_INDEX_COUNT] = {0}; SyncIndex newConfigIndexArr[MAX_CONFIG_INDEX_COUNT] = {0};
SSnapshot snapshot = {0}; SSnapshot snapshot = {0};
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
if (snapshot.lastApplyIndex != SYNC_INDEX_INVALID) { if (snapshot.lastApplyIndex != SYNC_INDEX_INVALID) {
for (int32_t i = 0; i < ths->pRaftCfg->configIndexCount; ++i) { for (int32_t i = 0; i < ths->raftCfg.configIndexCount; ++i) {
if (ths->pRaftCfg->configIndexArr[i] < snapshot.lastConfigIndex) { if (ths->raftCfg.configIndexArr[i] < snapshot.lastConfigIndex) {
// pass // pass
} else { } else {
// save // save
newConfigIndexArr[newArrIndex] = ths->pRaftCfg->configIndexArr[i]; newConfigIndexArr[newArrIndex] = ths->raftCfg.configIndexArr[i];
++newArrIndex; ++newArrIndex;
} }
} }
int32_t oldCnt = ths->pRaftCfg->configIndexCount; int32_t oldCnt = ths->raftCfg.configIndexCount;
ths->pRaftCfg->configIndexCount = newArrIndex; ths->raftCfg.configIndexCount = newArrIndex;
memcpy(ths->pRaftCfg->configIndexArr, newConfigIndexArr, sizeof(newConfigIndexArr)); memcpy(ths->raftCfg.configIndexArr, newConfigIndexArr, sizeof(newConfigIndexArr));
int32_t code = raftCfgPersist(ths->pRaftCfg); int32_t code = syncWriteCfgFile(ths);
if (code != 0) { if (code != 0) {
sNFatal(ths, "failed to persist cfg"); sNFatal(ths, "failed to persist cfg");
} else { } else {
sNTrace(ths, "clean config index arr, old-cnt:%d, new-cnt:%d", oldCnt, ths->pRaftCfg->configIndexCount); sNTrace(ths, "clean config index arr, old-cnt:%d, new-cnt:%d", oldCnt, ths->raftCfg.configIndexCount);
} }
} }
#endif
} }
static int32_t syncNodeTimerRoutine(SSyncNode* ths) { static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
......
此差异已折叠。
...@@ -70,3 +70,47 @@ int main(int argc, char** argv) { ...@@ -70,3 +70,47 @@ int main(int argc, char** argv) {
taosCloseLog(); taosCloseLog();
return 0; return 0;
} }
static inline bool syncUtilCanPrint(char c) {
if (c >= 32 && c <= 126) {
return true;
} else {
return false;
}
}
char* syncUtilPrintBin(char* ptr, uint32_t len) {
int64_t memLen = (int64_t)(len + 1);
char* s = taosMemoryMalloc(memLen);
ASSERT(s != NULL);
memset(s, 0, len + 1);
memcpy(s, ptr, len);
for (int32_t i = 0; i < len; ++i) {
if (!syncUtilCanPrint(s[i])) {
s[i] = '.';
}
}
return s;
}
char* syncUtilPrintBin2(char* ptr, uint32_t len) {
uint32_t len2 = len * 4 + 1;
char* s = taosMemoryMalloc(len2);
ASSERT(s != NULL);
memset(s, 0, len2);
char* p = s;
for (int32_t i = 0; i < len; ++i) {
int32_t n = sprintf(p, "%d,", ptr[i]);
p += n;
}
return s;
}
void syncUtilMsgNtoH(void* msg) {
SMsgHead* pHead = msg;
pHead->contLen = ntohl(pHead->contLen);
pHead->vgId = ntohl(pHead->vgId);
}
...@@ -200,8 +200,8 @@ inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { ...@@ -200,8 +200,8 @@ inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
"r-num:%d, " "r-num:%d, "
"lcfg:%" PRId64 ", chging:%d, rsto:%d", "lcfg:%" PRId64 ", chging:%d, rsto:%d",
pSyncNode->vgId, syncStr(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, pSyncNode->vgId, syncStr(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex,
logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->raftCfg.isStandBy, pSyncNode->replicaNum,
pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish); pSyncNode->raftCfg.lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish);
return s; return s;
} }
...@@ -243,7 +243,7 @@ int32_t syncNodePingPeers(SSyncNode* pSyncNode) { ...@@ -243,7 +243,7 @@ int32_t syncNodePingPeers(SSyncNode* pSyncNode) {
int32_t syncNodePingAll(SSyncNode* pSyncNode) { int32_t syncNodePingAll(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.replicaNum; ++i) {
SRaftId* destId = &(pSyncNode->replicasId[i]); SRaftId* destId = &(pSyncNode->replicasId[i]);
SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId); SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId);
ret = syncNodePing(pSyncNode, destId, pMsg); ret = syncNodePing(pSyncNode, destId, pMsg);
......
...@@ -24,7 +24,6 @@ static SMsgCb defaultMsgCb; ...@@ -24,7 +24,6 @@ static SMsgCb defaultMsgCb;
void tmsgSetDefault(const SMsgCb* msgcb) { defaultMsgCb = *msgcb; } void tmsgSetDefault(const SMsgCb* msgcb) { defaultMsgCb = *msgcb; }
int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg) { int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg) {
ASSERT(msgcb != NULL);
int32_t code = (*msgcb->putToQueueFp)(msgcb->mgmt, qtype, pMsg); int32_t code = (*msgcb->putToQueueFp)(msgcb->mgmt, qtype, pMsg);
if (code != 0) { if (code != 0) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
...@@ -59,3 +58,7 @@ void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg) { (*defaultMsgCb.registerBrokenLin ...@@ -59,3 +58,7 @@ void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg) { (*defaultMsgCb.registerBrokenLin
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type) { (*defaultMsgCb.releaseHandleFp)(pHandle, type); } void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type) { (*defaultMsgCb.releaseHandleFp)(pHandle, type); }
void tmsgReportStartup(const char* name, const char* desc) { (*defaultMsgCb.reportStartupFp)(name, desc); } void tmsgReportStartup(const char* name, const char* desc) { (*defaultMsgCb.reportStartupFp)(name, desc); }
int32_t tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port) {
return (*defaultMsgCb.updateDnodeInfoFp)(defaultMsgCb.data, dnodeId, clusterId, fqdn, port);
}
...@@ -640,7 +640,7 @@ int32_t taosFtruncateFile(TdFilePtr pFile, int64_t l_size) { ...@@ -640,7 +640,7 @@ int32_t taosFtruncateFile(TdFilePtr pFile, int64_t l_size) {
int32_t taosFsyncFile(TdFilePtr pFile) { int32_t taosFsyncFile(TdFilePtr pFile) {
if (pFile == NULL) { if (pFile == NULL) {
return 0; return -1;
} }
// this implementation is WRONG // this implementation is WRONG
......
...@@ -181,7 +181,7 @@ typedef struct _script_t { ...@@ -181,7 +181,7 @@ typedef struct _script_t {
extern SScript *simScriptList[MAX_MAIN_SCRIPT_NUM]; extern SScript *simScriptList[MAX_MAIN_SCRIPT_NUM];
extern SCommand simCmdList[]; extern SCommand simCmdList[];
extern int32_t simScriptPos; extern int32_t simScriptPos;
extern int32_t simScriptSucced; extern int32_t simScriptSucceed;
extern int32_t simDebugFlag; extern int32_t simDebugFlag;
extern char simScriptDir[]; extern char simScriptDir[];
extern bool abortExecution; extern bool abortExecution;
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
SScript *simScriptList[MAX_MAIN_SCRIPT_NUM]; SScript *simScriptList[MAX_MAIN_SCRIPT_NUM];
SCommand simCmdList[SIM_CMD_END]; SCommand simCmdList[SIM_CMD_END];
int32_t simScriptPos = -1; int32_t simScriptPos = -1;
int32_t simScriptSucced = 0; int32_t simScriptSucceed = 0;
int32_t simDebugFlag = 143; int32_t simDebugFlag = 143;
void simCloseTaosdConnect(SScript *script); void simCloseTaosdConnect(SScript *script);
char simScriptDir[PATH_MAX] = {0}; char simScriptDir[PATH_MAX] = {0};
...@@ -88,13 +88,13 @@ SScript *simProcessCallOver(SScript *script) { ...@@ -88,13 +88,13 @@ SScript *simProcessCallOver(SScript *script) {
} }
simCloseTaosdConnect(script); simCloseTaosdConnect(script);
simScriptSucced++; simScriptSucceed++;
simScriptPos--; simScriptPos--;
simFreeScript(script); simFreeScript(script);
if (simScriptPos == -1 && simExecSuccess) { if (simScriptPos == -1 && simExecSuccess) {
simInfo("----------------------------------------------------------------------"); simInfo("----------------------------------------------------------------------");
simInfo("Simulation Test Done, " SUCCESS_PREFIX "%d" SUCCESS_POSTFIX " Passed:\n", simScriptSucced); simInfo("Simulation Test Done, " SUCCESS_PREFIX "%d" SUCCESS_POSTFIX " Passed:\n", simScriptSucceed);
return NULL; return NULL;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册