提交 80fe2e00 编写于 作者: S slguan

[TD-52] fix error while syncing

上级 584ccc43
...@@ -59,6 +59,7 @@ extern uint16_t tsMnodeDnodePort; ...@@ -59,6 +59,7 @@ extern uint16_t tsMnodeDnodePort;
extern uint16_t tsMnodeShellPort; extern uint16_t tsMnodeShellPort;
extern uint16_t tsDnodeShellPort; extern uint16_t tsDnodeShellPort;
extern uint16_t tsDnodeMnodePort; extern uint16_t tsDnodeMnodePort;
extern uint16_t tsSyncPort;
extern int tsStatusInterval; extern int tsStatusInterval;
extern int tsShellActivityTimer; extern int tsShellActivityTimer;
......
...@@ -65,7 +65,8 @@ char tsSecondIp[TSDB_IPv4ADDR_LEN] = {0}; ...@@ -65,7 +65,8 @@ char tsSecondIp[TSDB_IPv4ADDR_LEN] = {0};
uint16_t tsMnodeShellPort = 6030; // udp[6030-6034] tcp[6030] uint16_t tsMnodeShellPort = 6030; // udp[6030-6034] tcp[6030]
uint16_t tsDnodeShellPort = 6035; // udp[6035-6039] tcp[6035] uint16_t tsDnodeShellPort = 6035; // udp[6035-6039] tcp[6035]
uint16_t tsMnodeDnodePort = 6040; // udp/tcp uint16_t tsMnodeDnodePort = 6040; // udp/tcp
uint16_t tsDnodeMnodePort = 6041; // udp/tcp uint16_t tsDnodeMnodePort = 6045; // udp/tcp
uint16_t tsSyncPort = 6050;
int32_t tsStatusInterval = 1; // second int32_t tsStatusInterval = 1; // second
int32_t tsShellActivityTimer = 3; // second int32_t tsShellActivityTimer = 3; // second
......
...@@ -34,26 +34,28 @@ ...@@ -34,26 +34,28 @@
#define MPEER_CONTENT_LEN 2000 #define MPEER_CONTENT_LEN 2000
static bool dnodeReadMnodeIpList(); static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes);
static void dnodeSaveMnodeIpList(); static bool dnodeReadMnodeInfos();
static void dnodeReadDnodeInfo(); static void dnodeSaveMnodeInfos();
static void dnodeUpdateDnodeInfo(int32_t dnodeId); static void dnodeUpdateDnodeCfg(SDMDnodeCfg *pCfg);
static bool dnodeReadDnodeCfg();
static void dnodeSaveDnodeCfg();
static void dnodeProcessRspFromMnode(SRpcMsg *pMsg); static void dnodeProcessRspFromMnode(SRpcMsg *pMsg);
static void dnodeProcessStatusRsp(SRpcMsg *pMsg); static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
static void dnodeSendStatusMsg(void *handle, void *tmrId); static void dnodeSendStatusMsg(void *handle, void *tmrId);
static void (*tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void (*tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void *tsDnodeMClientRpc = NULL; static void *tsDnodeMClientRpc = NULL;
static SRpcIpSet tsMnodeIpList = {0};
static SDMNodeInfos tsMnodeInfos = {0};
static void *tsDnodeTmr = NULL; static void *tsDnodeTmr = NULL;
static void *tsStatusTimer = NULL; static void *tsStatusTimer = NULL;
static uint32_t tsRebootTime; static uint32_t tsRebootTime;
static int32_t tsDnodeId = 0;
static char tsDnodeName[TSDB_NODE_NAME_LEN]; static SRpcIpSet tsMnodeIpSet = {0};
static SDMMnodeInfos tsMnodeInfos = {0};
static SDMDnodeCfg tsDnodeCfg = {0};
int32_t dnodeInitMClient() { int32_t dnodeInitMClient() {
dnodeReadDnodeInfo(); dnodeReadDnodeCfg();
tsRebootTime = taosGetTimestampSec(); tsRebootTime = taosGetTimestampSec();
tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM"); tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
...@@ -62,22 +64,22 @@ int32_t dnodeInitMClient() { ...@@ -62,22 +64,22 @@ int32_t dnodeInitMClient() {
return -1; return -1;
} }
if (!dnodeReadMnodeIpList()) { if (!dnodeReadMnodeInfos()) {
memset(&tsMnodeIpList, 0, sizeof(SRpcIpSet)); memset(&tsMnodeIpSet, 0, sizeof(SRpcIpSet));
memset(&tsMnodeInfos, 0, sizeof(SDMNodeInfos)); memset(&tsMnodeInfos, 0, sizeof(SDMMnodeInfos));
tsMnodeIpList.port = tsMnodeDnodePort; tsMnodeIpSet.port = tsMnodeDnodePort;
tsMnodeIpList.numOfIps = 1; tsMnodeIpSet.numOfIps = 1;
tsMnodeIpList.ip[0] = inet_addr(tsMasterIp); tsMnodeIpSet.ip[0] = inet_addr(tsMasterIp);
if (strcmp(tsSecondIp, tsMasterIp) != 0) { if (strcmp(tsSecondIp, tsMasterIp) != 0) {
tsMnodeIpList.numOfIps = 2; tsMnodeIpSet.numOfIps = 2;
tsMnodeIpList.ip[1] = inet_addr(tsSecondIp); tsMnodeIpSet.ip[1] = inet_addr(tsSecondIp);
} }
} else { } else {
tsMnodeIpList.inUse = tsMnodeInfos.inUse; tsMnodeIpSet.inUse = tsMnodeInfos.inUse;
tsMnodeIpList.numOfIps = tsMnodeInfos.nodeNum; tsMnodeIpSet.numOfIps = tsMnodeInfos.nodeNum;
tsMnodeIpList.port = tsMnodeInfos.nodeInfos[0].nodePort; tsMnodeIpSet.port = tsMnodeInfos.nodeInfos[0].nodePort;
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) { for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
tsMnodeIpList.ip[i] = tsMnodeInfos.nodeInfos[i].nodeIp; tsMnodeIpSet.ip[i] = tsMnodeInfos.nodeInfos[i].nodeIp;
} }
} }
...@@ -144,58 +146,68 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { ...@@ -144,58 +146,68 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
} }
SDMStatusRsp *pStatusRsp = pMsg->pCont; SDMStatusRsp *pStatusRsp = pMsg->pCont;
SDMNodeInfos *mnodes = &pStatusRsp->mnodes; SDMMnodeInfos *pMnodes = &pStatusRsp->mnodes;
if (mnodes->nodeNum <= 0) { if (pMnodes->nodeNum <= 0) {
dError("status msg is invalid, num of ips is %d", mnodes->nodeNum); dError("status msg is invalid, num of ips is %d", pMnodes->nodeNum);
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
return; return;
} }
SDnodeState *pState = &pStatusRsp->dnodeState; SDMDnodeCfg *pCfg = &pStatusRsp->dnodeCfg;
pState->numOfVnodes = htonl(pState->numOfVnodes); pCfg->numOfVnodes = htonl(pCfg->numOfVnodes);
pState->moduleStatus = htonl(pState->moduleStatus); pCfg->moduleStatus = htonl(pCfg->moduleStatus);
pState->createdTime = htonl(pState->createdTime); pCfg->dnodeId = htonl(pCfg->dnodeId);
pState->dnodeId = htonl(pState->dnodeId);
for (int32_t i = 0; i < pMnodes->nodeNum; ++i) {
dnodeProcessModuleStatus(pState->moduleStatus); SDMMnodeInfo *pMnodeInfo = &pMnodes->nodeInfos[i];
dnodeUpdateDnodeInfo(pState->dnodeId); pMnodeInfo->nodeId = htonl(pMnodeInfo->nodeId);
pMnodeInfo->nodeIp = htonl(pMnodeInfo->nodeIp);
SRpcIpSet mgmtIpSet = {0}; pMnodeInfo->nodePort = htons(pMnodeInfo->nodePort);
mgmtIpSet.inUse = mnodes->inUse; pMnodeInfo->syncPort = htons(pMnodeInfo->syncPort);
mgmtIpSet.numOfIps = mnodes->nodeNum;
mgmtIpSet.port = htons(mnodes->nodeInfos[0].nodePort);
for (int32_t i = 0; i < mnodes->nodeNum; i++) {
mgmtIpSet.ip[i] = htonl(mnodes->nodeInfos[i].nodeIp);
}
if (memcmp(&mgmtIpSet, &tsMnodeIpList, sizeof(SRpcIpSet)) != 0 || tsMnodeInfos.nodeNum == 0) {
memcpy(&tsMnodeIpList, &mgmtIpSet, sizeof(SRpcIpSet));
tsMnodeInfos.inUse = mnodes->inUse;
tsMnodeInfos.nodeNum = mnodes->nodeNum;
dPrint("mnode ip list is changed, numOfIps:%d inUse:%d", tsMnodeInfos.nodeNum, tsMnodeInfos.inUse);
for (int32_t i = 0; i < mnodes->nodeNum; i++) {
tsMnodeInfos.nodeInfos[i].nodeId = htonl(mnodes->nodeInfos[i].nodeId);
tsMnodeInfos.nodeInfos[i].nodeIp = htonl(mnodes->nodeInfos[i].nodeIp);
tsMnodeInfos.nodeInfos[i].nodePort = htons(mnodes->nodeInfos[i].nodePort);
strcpy(tsMnodeInfos.nodeInfos[i].nodeName, mnodes->nodeInfos[i].nodeName);
dPrint("mnode:%d, ip:%s:%u name:%s", tsMnodeInfos.nodeInfos[i].nodeId,
taosIpStr(tsMnodeInfos.nodeInfos[i].nodeIp), tsMnodeInfos.nodeInfos[i].nodePort,
tsMnodeInfos.nodeInfos[i].nodeName);
}
dnodeSaveMnodeIpList();
sdbUpdateSync();
} }
SDMVgroupAccess *pVgAcccess = pStatusRsp->vgAccess;
for (int32_t i = 0; i < pCfg->numOfVnodes; ++i) {
pVgAcccess[i].vgId = htonl(pVgAcccess[i].vgId);
}
dnodeProcessModuleStatus(pCfg->moduleStatus);
dnodeUpdateDnodeCfg(pCfg);
dnodeUpdateMnodeInfos(pMnodes);
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
} }
static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) {
bool mnodesChanged = (memcmp(&tsMnodeInfos, pMnodes, sizeof(SDMMnodeInfos)) != 0);
bool mnodesNotInit = (tsMnodeInfos.nodeNum == 0);
if (!(mnodesChanged || mnodesNotInit)) return;
memcpy(&tsMnodeInfos, pMnodes, sizeof(SDMMnodeInfos));
tsMnodeIpSet.inUse = tsMnodeInfos.inUse;
tsMnodeIpSet.numOfIps = tsMnodeInfos.nodeNum;
tsMnodeIpSet.port = tsMnodeInfos.nodeInfos[0].nodePort;
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
tsMnodeIpSet.ip[i] = tsMnodeInfos.nodeInfos[i].nodeIp;
}
dPrint("mnodes is changed, nodeNum:%d inUse:%d", tsMnodeInfos.nodeNum, tsMnodeInfos.inUse);
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
dPrint("mnode:%d, ip:%s:%u name:%s", tsMnodeInfos.nodeInfos[i].nodeId, taosIpStr(tsMnodeInfos.nodeInfos[i].nodeIp),
tsMnodeInfos.nodeInfos[i].nodePort, tsMnodeInfos.nodeInfos[i].nodeName);
}
dnodeSaveMnodeInfos();
sdbUpdateSync();
}
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
if (tsDnodeMClientRpc) { if (tsDnodeMClientRpc) {
rpcSendRequest(tsDnodeMClientRpc, &tsMnodeIpList, rpcMsg); rpcSendRequest(tsDnodeMClientRpc, &tsMnodeIpSet, rpcMsg);
} }
} }
static bool dnodeReadMnodeIpList() { static bool dnodeReadMnodeInfos() {
char ipFile[TSDB_FILENAME_LEN] = {0}; char ipFile[TSDB_FILENAME_LEN] = {0};
sprintf(ipFile, "%s/mgmtIpList.json", tsDnodeDir); sprintf(ipFile, "%s/mgmtIpList.json", tsDnodeDir);
FILE *fp = fopen(ipFile, "r"); FILE *fp = fopen(ipFile, "r");
...@@ -272,6 +284,13 @@ static bool dnodeReadMnodeIpList() { ...@@ -272,6 +284,13 @@ static bool dnodeReadMnodeIpList() {
} }
tsMnodeInfos.nodeInfos[i].nodePort = (uint16_t)nodePort->valueint; tsMnodeInfos.nodeInfos[i].nodePort = (uint16_t)nodePort->valueint;
cJSON *syncPort = cJSON_GetObjectItem(nodeInfo, "syncPort");
if (!syncPort || syncPort->type != cJSON_Number) {
dError("failed to read mnode mgmtIpList.json, syncPort not found");
goto PARSE_OVER;
}
tsMnodeInfos.nodeInfos[i].syncPort = (uint16_t)syncPort->valueint;
cJSON *nodeName = cJSON_GetObjectItem(nodeInfo, "nodeName"); cJSON *nodeName = cJSON_GetObjectItem(nodeInfo, "nodeName");
if (!nodeIp || nodeName->type != cJSON_String || nodeName->valuestring == NULL) { if (!nodeIp || nodeName->type != cJSON_String || nodeName->valuestring == NULL) {
dError("failed to read mnode mgmtIpList.json, nodeName not found"); dError("failed to read mnode mgmtIpList.json, nodeName not found");
...@@ -296,7 +315,7 @@ PARSE_OVER: ...@@ -296,7 +315,7 @@ PARSE_OVER:
return ret; return ret;
} }
static void dnodeSaveMnodeIpList() { static void dnodeSaveMnodeInfos() {
char ipFile[TSDB_FILENAME_LEN] = {0}; char ipFile[TSDB_FILENAME_LEN] = {0};
sprintf(ipFile, "%s/mgmtIpList.json", tsDnodeDir); sprintf(ipFile, "%s/mgmtIpList.json", tsDnodeDir);
FILE *fp = fopen(ipFile, "w"); FILE *fp = fopen(ipFile, "w");
...@@ -314,6 +333,7 @@ static void dnodeSaveMnodeIpList() { ...@@ -314,6 +333,7 @@ static void dnodeSaveMnodeIpList() {
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsMnodeInfos.nodeInfos[i].nodeId); len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsMnodeInfos.nodeInfos[i].nodeId);
len += snprintf(content + len, maxLen - len, " \"nodeIp\": \"%s\",\n", taosIpStr(tsMnodeInfos.nodeInfos[i].nodeIp)); len += snprintf(content + len, maxLen - len, " \"nodeIp\": \"%s\",\n", taosIpStr(tsMnodeInfos.nodeInfos[i].nodeIp));
len += snprintf(content + len, maxLen - len, " \"nodePort\": %u,\n", tsMnodeInfos.nodeInfos[i].nodePort); len += snprintf(content + len, maxLen - len, " \"nodePort\": %u,\n", tsMnodeInfos.nodeInfos[i].nodePort);
len += snprintf(content + len, maxLen - len, " \"syncPort\": %u,\n", tsMnodeInfos.nodeInfos[i].syncPort);
len += snprintf(content + len, maxLen - len, " \"nodeName\": \"%s\"\n", tsMnodeInfos.nodeInfos[i].nodeName); len += snprintf(content + len, maxLen - len, " \"nodeName\": \"%s\"\n", tsMnodeInfos.nodeInfos[i].nodeName);
if (i < tsMnodeInfos.nodeNum -1) { if (i < tsMnodeInfos.nodeNum -1) {
len += snprintf(content + len, maxLen - len, " },{\n"); len += snprintf(content + len, maxLen - len, " },{\n");
...@@ -331,10 +351,10 @@ static void dnodeSaveMnodeIpList() { ...@@ -331,10 +351,10 @@ static void dnodeSaveMnodeIpList() {
} }
uint32_t dnodeGetMnodeMasteIp() { uint32_t dnodeGetMnodeMasteIp() {
return tsMnodeIpList.ip[tsMnodeIpList.inUse]; return tsMnodeIpSet.ip[tsMnodeIpSet.inUse];
} }
void* dnodeGetMnodeList() { void* dnodeGetMnodeInfos() {
return &tsMnodeInfos; return &tsMnodeInfos;
} }
...@@ -358,9 +378,9 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { ...@@ -358,9 +378,9 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
return; return;
} }
strcpy(pStatus->dnodeName, tsDnodeName); //strcpy(pStatus->dnodeName, tsDnodeName);
pStatus->version = htonl(tsVersion); pStatus->version = htonl(tsVersion);
pStatus->dnodeId = htonl(tsDnodeId); pStatus->dnodeId = htonl(tsDnodeCfg.dnodeId);
pStatus->privateIp = htonl(inet_addr(tsPrivateIp)); pStatus->privateIp = htonl(inet_addr(tsPrivateIp));
pStatus->publicIp = htonl(inet_addr(tsPublicIp)); pStatus->publicIp = htonl(inet_addr(tsPublicIp));
pStatus->lastReboot = htonl(tsRebootTime); pStatus->lastReboot = htonl(tsRebootTime);
...@@ -368,7 +388,7 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { ...@@ -368,7 +388,7 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
pStatus->numOfCores = htons((uint16_t) tsNumOfCores); pStatus->numOfCores = htons((uint16_t) tsNumOfCores);
pStatus->diskAvailable = tsAvailDataDirGB; pStatus->diskAvailable = tsAvailDataDirGB;
pStatus->alternativeRole = (uint8_t) tsAlternativeRole; pStatus->alternativeRole = (uint8_t) tsAlternativeRole;
vnodeBuildStatusMsg(pStatus); vnodeBuildStatusMsg(pStatus);
contLen = sizeof(SDMStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad); contLen = sizeof(SDMStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
pStatus->openVnodes = htons(pStatus->openVnodes); pStatus->openVnodes = htons(pStatus->openVnodes);
...@@ -382,47 +402,81 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { ...@@ -382,47 +402,81 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
dnodeSendMsgToMnode(&rpcMsg); dnodeSendMsgToMnode(&rpcMsg);
} }
static void dnodeReadDnodeInfo() { static bool dnodeReadDnodeCfg() {
char dnodeIdFile[TSDB_FILENAME_LEN] = {0}; char dnodeCfgFile[TSDB_FILENAME_LEN] = {0};
sprintf(dnodeIdFile, "%s/dnodeId", tsDnodeDir); sprintf(dnodeCfgFile, "%s/dnodeCfg.json", tsDnodeDir);
FILE *fp = fopen(dnodeIdFile, "r"); FILE *fp = fopen(dnodeCfgFile, "r");
if (!fp) return; if (!fp) {
dTrace("failed to read dnodeCfg.json, file not exist");
char option[32] = {0}; return false;
int32_t value = 0; }
int32_t num = 0;
num = fscanf(fp, "%s %d", option, &value);
if (num != 2) return;
if (strcmp(option, "dnodeId") != 0) return;
tsDnodeId = value;;
bool ret = false;
int maxLen = 100;
char *content = calloc(1, maxLen + 1);
int len = fread(content, 1, maxLen, fp);
if (len <= 0) {
free(content);
fclose(fp);
dError("failed to read dnodeCfg.json, content is null");
return false;
}
cJSON* root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read dnodeCfg.json, invalid json format");
goto PARSE_CFG_OVER;
}
cJSON* dnodeId = cJSON_GetObjectItem(root, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_Number) {
dError("failed to read dnodeCfg.json, dnodeId not found");
goto PARSE_CFG_OVER;
}
tsDnodeCfg.dnodeId = dnodeId->valueint;
ret = true;
dPrint("read numOfVnodes successed, dnodeId:%d", tsDnodeCfg.dnodeId);
PARSE_CFG_OVER:
free(content);
cJSON_Delete(root);
fclose(fp); fclose(fp);
dPrint("read dnodeId:%d successed", tsDnodeId); return ret;
} }
static void dnodeSaveDnodeInfo() { static void dnodeSaveDnodeCfg() {
char dnodeIdFile[TSDB_FILENAME_LEN] = {0}; char dnodeCfgFile[TSDB_FILENAME_LEN] = {0};
sprintf(dnodeIdFile, "%s/dnodeId", tsDnodeDir); sprintf(dnodeCfgFile, "%s/dnodeCfg.json", tsDnodeDir);
FILE *fp = fopen(dnodeIdFile, "w"); FILE *fp = fopen(dnodeCfgFile, "w");
if (!fp) return; if (!fp) return;
fprintf(fp, "dnodeId %d\n", tsDnodeId); int32_t len = 0;
int32_t maxLen = 100;
char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d\n", tsDnodeCfg.dnodeId);
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fclose(fp); fclose(fp);
free(content);
dPrint("save dnodeId successed"); dPrint("save dnodeId successed");
} }
void dnodeUpdateDnodeInfo(int32_t dnodeId) { void dnodeUpdateDnodeCfg(SDMDnodeCfg *pCfg) {
if (tsDnodeId == 0) { if (tsDnodeCfg.dnodeId == 0) {
dPrint("dnodeId is set to %d", dnodeId); dPrint("dnodeId is set to %d", pCfg->dnodeId);
tsDnodeId = dnodeId; tsDnodeCfg.dnodeId = pCfg->dnodeId;
dnodeSaveDnodeInfo(); dnodeSaveDnodeCfg();
} }
} }
int32_t dnodeGetDnodeId() { int32_t dnodeGetDnodeId() {
return tsDnodeId; return tsDnodeCfg.dnodeId;
} }
\ No newline at end of file
...@@ -43,7 +43,7 @@ void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code); ...@@ -43,7 +43,7 @@ void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code);
bool dnodeIsFirstDeploy(); bool dnodeIsFirstDeploy();
uint32_t dnodeGetMnodeMasteIp(); uint32_t dnodeGetMnodeMasteIp();
void * dnodeGetMnodeList(); void * dnodeGetMnodeInfos();
int32_t dnodeGetDnodeId(); int32_t dnodeGetDnodeId();
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -521,12 +521,6 @@ typedef struct { ...@@ -521,12 +521,6 @@ typedef struct {
uint8_t reserved[5]; uint8_t reserved[5];
} SVnodeLoad; } SVnodeLoad;
typedef struct {
uint32_t vnode;
uint8_t accessState;
uint8_t reserved[3];
} SVnodeAccess;
/* /*
* NOTE: sizeof(SVnodeCfg) < TSDB_FILE_HEADER_LEN / 4 * NOTE: sizeof(SVnodeCfg) < TSDB_FILE_HEADER_LEN / 4
*/ */
...@@ -571,12 +565,30 @@ typedef struct { ...@@ -571,12 +565,30 @@ typedef struct {
char reserved[64]; char reserved[64];
} SVnodeStatisticInfo; } SVnodeStatisticInfo;
typedef struct {
int32_t vgId;
int8_t accessState;
} SDMVgroupAccess;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
uint32_t moduleStatus; uint32_t moduleStatus;
uint32_t createdTime;
uint32_t numOfVnodes; uint32_t numOfVnodes;
} SDnodeState; } SDMDnodeCfg;
typedef struct {
int32_t nodeId;
uint32_t nodeIp;
uint16_t nodePort;
uint16_t syncPort;
char nodeName[TSDB_NODE_NAME_LEN + 1];
} SDMMnodeInfo;
typedef struct {
int8_t inUse;
int8_t nodeNum;
SDMMnodeInfo nodeInfos[TSDB_MAX_MPEERS];
} SDMMnodeInfos;
typedef struct { typedef struct {
uint32_t version; uint32_t version;
...@@ -596,22 +608,9 @@ typedef struct { ...@@ -596,22 +608,9 @@ typedef struct {
} SDMStatusMsg; } SDMStatusMsg;
typedef struct { typedef struct {
int32_t nodeId; SDMMnodeInfos mnodes;
uint32_t nodeIp; SDMDnodeCfg dnodeCfg;
uint16_t nodePort; SDMVgroupAccess vgAccess[];
char nodeName[TSDB_NODE_NAME_LEN + 1];
} SDMNodeInfo;
typedef struct {
int8_t inUse;
int8_t nodeNum;
SDMNodeInfo nodeInfos[TSDB_MAX_MPEERS];
} SDMNodeInfos;
typedef struct {
SDMNodeInfos mnodes;
SDnodeState dnodeState;
SVnodeAccess vnodeAccess[];
} SDMStatusRsp; } SDMStatusRsp;
typedef struct { typedef struct {
......
...@@ -104,10 +104,10 @@ extern char *syncRole[]; ...@@ -104,10 +104,10 @@ extern char *syncRole[];
extern int tsMaxSyncNum; extern int tsMaxSyncNum;
extern int tsSyncTcpThreads; extern int tsSyncTcpThreads;
extern int tsMaxWatchFiles; extern int tsMaxWatchFiles;
extern short tsSyncPort;
extern int tsSyncTimer; extern int tsSyncTimer;
extern int tsMaxFwdInfo; extern int tsMaxFwdInfo;
extern int sDebugFlag; extern int sDebugFlag;
extern uint16_t tsSyncPort;
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -41,7 +41,7 @@ void mgmtReleaseMnode(struct SMnodeObj *pMnode); ...@@ -41,7 +41,7 @@ void mgmtReleaseMnode(struct SMnodeObj *pMnode);
char * mgmtGetMnodeRoleStr(); char * mgmtGetMnodeRoleStr();
void mgmtGetMnodeIpList(SRpcIpSet *ipSet, bool usePublicIp); void mgmtGetMnodeIpList(SRpcIpSet *ipSet, bool usePublicIp);
void mgmtGetMnodeList(void *mnodes); void mgmtGetMnodeInfos(void *mnodes);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -68,7 +68,7 @@ static int32_t mgmtDnodeActionInsert(SSdbOper *pOper) { ...@@ -68,7 +68,7 @@ static int32_t mgmtDnodeActionInsert(SSdbOper *pOper) {
pDnode->mnodeDnodePort = tsMnodeDnodePort; pDnode->mnodeDnodePort = tsMnodeDnodePort;
pDnode->dnodeShellPort = tsDnodeShellPort; pDnode->dnodeShellPort = tsDnodeShellPort;
pDnode->dnodeMnodePort = tsDnodeMnodePort; pDnode->dnodeMnodePort = tsDnodeMnodePort;
pDnode->syncPort = 0; pDnode->syncPort = tsSyncPort;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -274,12 +274,12 @@ static void mgmtProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) { ...@@ -274,12 +274,12 @@ static void mgmtProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) {
void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
SDMStatusMsg *pStatus = rpcMsg->pCont; SDMStatusMsg *pStatus = rpcMsg->pCont;
pStatus->dnodeId = htonl(pStatus->dnodeId); pStatus->dnodeId = htonl(pStatus->dnodeId);
pStatus->privateIp = htonl(pStatus->privateIp); pStatus->privateIp = htonl(pStatus->privateIp);
pStatus->publicIp = htonl(pStatus->publicIp); pStatus->publicIp = htonl(pStatus->publicIp);
pStatus->moduleStatus = htonl(pStatus->moduleStatus); pStatus->moduleStatus = htonl(pStatus->moduleStatus);
pStatus->lastReboot = htonl(pStatus->lastReboot); pStatus->lastReboot = htonl(pStatus->lastReboot);
pStatus->numOfCores = htons(pStatus->numOfCores); pStatus->numOfCores = htons(pStatus->numOfCores);
pStatus->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes); pStatus->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes);
uint32_t version = htonl(pStatus->version); uint32_t version = htonl(pStatus->version);
...@@ -346,19 +346,18 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -346,19 +346,18 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
mgmtReleaseDnode(pDnode); mgmtReleaseDnode(pDnode);
int32_t contLen = sizeof(SDMStatusRsp) + TSDB_MAX_VNODES * sizeof(SVnodeAccess); int32_t contLen = sizeof(SDMStatusRsp) + TSDB_MAX_VNODES * sizeof(SDMVgroupAccess);
SDMStatusRsp *pRsp = rpcMallocCont(contLen); SDMStatusRsp *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) { if (pRsp == NULL) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SERV_OUT_OF_MEMORY); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SERV_OUT_OF_MEMORY);
return; return;
} }
mgmtGetMnodeList(&pRsp->mnodes); mgmtGetMnodeInfos(&pRsp->mnodes);
pRsp->dnodeState.dnodeId = htonl(pDnode->dnodeId); pRsp->dnodeCfg.dnodeId = htonl(pDnode->dnodeId);
pRsp->dnodeState.moduleStatus = htonl((int32_t)pDnode->isMgmt); pRsp->dnodeCfg.moduleStatus = htonl((int32_t)pDnode->isMgmt);
pRsp->dnodeState.createdTime = htonl(pDnode->createdTime / 1000); pRsp->dnodeCfg.numOfVnodes = 0;
pRsp->dnodeState.numOfVnodes = 0;
contLen = sizeof(SDMStatusRsp); contLen = sizeof(SDMStatusRsp);
......
...@@ -195,8 +195,8 @@ void mgmtGetMnodeIpList(SRpcIpSet *ipSet, bool usePublicIp) { ...@@ -195,8 +195,8 @@ void mgmtGetMnodeIpList(SRpcIpSet *ipSet, bool usePublicIp) {
} }
} }
void mgmtGetMnodeList(void *param) { void mgmtGetMnodeInfos(void *param) {
SDMNodeInfos *mnodes = param; SDMMnodeInfos *mnodes = param;
mnodes->inUse = 0; mnodes->inUse = 0;
int32_t index = 0; int32_t index = 0;
...@@ -209,6 +209,7 @@ void mgmtGetMnodeList(void *param) { ...@@ -209,6 +209,7 @@ void mgmtGetMnodeList(void *param) {
mnodes->nodeInfos[index].nodeId = htonl(pMnode->mnodeId); mnodes->nodeInfos[index].nodeId = htonl(pMnode->mnodeId);
mnodes->nodeInfos[index].nodeIp = htonl(pMnode->pDnode->privateIp); mnodes->nodeInfos[index].nodeIp = htonl(pMnode->pDnode->privateIp);
mnodes->nodeInfos[index].nodePort = htons(pMnode->pDnode->mnodeDnodePort); mnodes->nodeInfos[index].nodePort = htons(pMnode->pDnode->mnodeDnodePort);
mnodes->nodeInfos[index].syncPort = htons(pMnode->pDnode->syncPort);
strcpy(mnodes->nodeInfos[index].nodeName, pMnode->pDnode->dnodeName); strcpy(mnodes->nodeInfos[index].nodeName, pMnode->pDnode->dnodeName);
if (pMnode->role == TAOS_SYNC_ROLE_MASTER) { if (pMnode->role == TAOS_SYNC_ROLE_MASTER) {
mnodes->inUse = index; mnodes->inUse = index;
......
...@@ -146,7 +146,7 @@ static int32_t sdbInitWal() { ...@@ -146,7 +146,7 @@ static int32_t sdbInitWal() {
} }
sdbTrace("open sdb wal for restore"); sdbTrace("open sdb wal for restore");
walRestore(tsSdbObj.wal, &tsSdbObj, sdbWrite); walRestore(tsSdbObj.wal, NULL, sdbWrite);
return 0; return 0;
} }
...@@ -174,12 +174,12 @@ void sdbUpdateMnodeRoles() { ...@@ -174,12 +174,12 @@ void sdbUpdateMnodeRoles() {
SNodesRole roles = {0}; SNodesRole roles = {0};
syncGetNodesRole(tsSdbObj.sync, &roles); syncGetNodesRole(tsSdbObj.sync, &roles);
mPrint("update mnodes:%d sync roles", tsSdbObj.cfg.replica); sdbPrint("update mnodes:%d sync roles", tsSdbObj.cfg.replica);
for (int32_t i = 0; i < tsSdbObj.cfg.replica; ++i) { for (int32_t i = 0; i < tsSdbObj.cfg.replica; ++i) {
SMnodeObj *pMnode = mgmtGetMnode(roles.nodeId[i]); SMnodeObj *pMnode = mgmtGetMnode(roles.nodeId[i]);
if (pMnode != NULL) { if (pMnode != NULL) {
pMnode->role = roles.role[i]; pMnode->role = roles.role[i];
mPrint("mnode:%d, role:%s", pMnode->mnodeId, mgmtGetMnodeRoleStr(pMnode->role)); sdbPrint("mnode:%d, role:%s", pMnode->mnodeId, mgmtGetMnodeRoleStr(pMnode->role));
mgmtReleaseMnode(pMnode); mgmtReleaseMnode(pMnode);
} }
} }
...@@ -196,7 +196,7 @@ static int sdbGetWalInfo(void *ahandle, char *name, uint32_t *index) { ...@@ -196,7 +196,7 @@ static int sdbGetWalInfo(void *ahandle, char *name, uint32_t *index) {
} }
static void sdbNotifyRole(void *ahandle, int8_t role) { static void sdbNotifyRole(void *ahandle, int8_t role) {
mPrint("mnode role changed from %s to %s", mgmtGetMnodeRoleStr(tsSdbObj.role), mgmtGetMnodeRoleStr(role)); sdbPrint("mnode role changed from %s to %s", mgmtGetMnodeRoleStr(tsSdbObj.role), mgmtGetMnodeRoleStr(role));
if (role == TAOS_SYNC_ROLE_MASTER && tsSdbObj.role != TAOS_SYNC_ROLE_MASTER) { if (role == TAOS_SYNC_ROLE_MASTER && tsSdbObj.role != TAOS_SYNC_ROLE_MASTER) {
balanceReset(); balanceReset();
...@@ -208,8 +208,8 @@ static void sdbNotifyRole(void *ahandle, int8_t role) { ...@@ -208,8 +208,8 @@ static void sdbNotifyRole(void *ahandle, int8_t role) {
static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
tsSdbObj.code = code; tsSdbObj.code = code;
sdbTrace("sdb forward request confirmed, result:%s", tstrerror(code));
sem_post(&tsSdbObj.sem); sem_post(&tsSdbObj.sem);
mPrint("sdb forward request confirmed, result:%s", tstrerror(code));
} }
static int32_t sdbForwardToPeer(void *pHead) { static int32_t sdbForwardToPeer(void *pHead) {
...@@ -227,9 +227,9 @@ void sdbUpdateSync() { ...@@ -227,9 +227,9 @@ void sdbUpdateSync() {
SSyncCfg syncCfg = {0}; SSyncCfg syncCfg = {0};
int32_t index = 0; int32_t index = 0;
SDMNodeInfos *mnodes = dnodeGetMnodeList(); SDMMnodeInfos *mnodes = dnodeGetMnodeInfos();
for (int32_t i = 0; i < mnodes->nodeNum; ++i) { for (int32_t i = 0; i < mnodes->nodeNum; ++i) {
SDMNodeInfo *node = &mnodes->nodeInfos[i]; SDMMnodeInfo *node = &mnodes->nodeInfos[i];
syncCfg.nodeInfo[i].nodeId = node->nodeId; syncCfg.nodeInfo[i].nodeId = node->nodeId;
syncCfg.nodeInfo[i].nodeIp = node->nodeIp; syncCfg.nodeInfo[i].nodeIp = node->nodeIp;
strcpy(syncCfg.nodeInfo[i].name, node->nodeName); strcpy(syncCfg.nodeInfo[i].name, node->nodeName);
...@@ -271,9 +271,9 @@ void sdbUpdateSync() { ...@@ -271,9 +271,9 @@ void sdbUpdateSync() {
if (!hasThisDnode) return; if (!hasThisDnode) return;
if (memcmp(&syncCfg, &tsSdbObj.cfg, sizeof(SSyncCfg)) == 0) return; if (memcmp(&syncCfg, &tsSdbObj.cfg, sizeof(SSyncCfg)) == 0) return;
mPrint("work as mnode, replica:%d arbitratorIp:%s", syncCfg.replica, taosIpStr(syncCfg.arbitratorIp)); sdbPrint("work as mnode, replica:%d arbitratorIp:%s", syncCfg.replica, taosIpStr(syncCfg.arbitratorIp));
for (int32_t i = 0; i < syncCfg.replica; ++i) { for (int32_t i = 0; i < syncCfg.replica; ++i) {
mPrint("mnode:%d, ip:%s name:%s", syncCfg.nodeInfo[i].nodeId, taosIpStr(syncCfg.nodeInfo[i].nodeIp), sdbPrint("mnode:%d, ip:%s name:%s", syncCfg.nodeInfo[i].nodeId, taosIpStr(syncCfg.nodeInfo[i].nodeIp),
syncCfg.nodeInfo[i].name); syncCfg.nodeInfo[i].name);
} }
...@@ -476,9 +476,13 @@ static int sdbWrite(void *param, void *data, int type) { ...@@ -476,9 +476,13 @@ static int sdbWrite(void *param, void *data, int type) {
pthread_mutex_unlock(&tsSdbObj.mutex); pthread_mutex_unlock(&tsSdbObj.mutex);
// from app, oper is created // from app, oper is created
if (param == NULL) return code; if (param != NULL) return code;
// from wal or forward msg, should create oper
if (tsSdbObj.sync != NULL) {
syncConfirmForward(tsSdbObj.sync, pHead->version, code);
}
// from wal, should create oper
if (action == SDB_ACTION_INSERT) { if (action == SDB_ACTION_INSERT) {
SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable}; SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable};
code = (*pTable->decodeFp)(&oper); code = (*pTable->decodeFp)(&oper);
...@@ -529,7 +533,7 @@ int32_t sdbInsertRow(SSdbOper *pOper) { ...@@ -529,7 +533,7 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
(*pTable->encodeFp)(pOper); (*pTable->encodeFp)(pOper);
pHead->len = pOper->rowSize; pHead->len = pOper->rowSize;
int32_t code = sdbWrite(NULL, pHead, pHead->msgType); int32_t code = sdbWrite(pOper, pHead, pHead->msgType);
taosFreeQitem(pHead); taosFreeQitem(pHead);
if (code < 0) return code; if (code < 0) return code;
} }
...@@ -571,7 +575,7 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { ...@@ -571,7 +575,7 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE;
memcpy(pHead->cont, pOper->pObj, rowSize); memcpy(pHead->cont, pOper->pObj, rowSize);
int32_t code = sdbWrite(NULL, pHead, pHead->msgType); int32_t code = sdbWrite(pOper, pHead, pHead->msgType);
taosFreeQitem(pHead); taosFreeQitem(pHead);
if (code < 0) return code; if (code < 0) return code;
} }
...@@ -602,7 +606,7 @@ int32_t sdbUpdateRow(SSdbOper *pOper) { ...@@ -602,7 +606,7 @@ int32_t sdbUpdateRow(SSdbOper *pOper) {
(*pTable->encodeFp)(pOper); (*pTable->encodeFp)(pOper);
pHead->len = pOper->rowSize; pHead->len = pOper->rowSize;
int32_t code = sdbWrite(NULL, pHead, pHead->msgType); int32_t code = sdbWrite(pOper, pHead, pHead->msgType);
taosFreeQitem(pHead); taosFreeQitem(pHead);
if (code < 0) return code; if (code < 0) return code;
} }
......
...@@ -93,7 +93,7 @@ echo "privateIp $NODE_IP" >> $TAOS_CFG ...@@ -93,7 +93,7 @@ echo "privateIp $NODE_IP" >> $TAOS_CFG
echo "dDebugFlag 199" >> $TAOS_CFG echo "dDebugFlag 199" >> $TAOS_CFG
echo "mDebugFlag 199" >> $TAOS_CFG echo "mDebugFlag 199" >> $TAOS_CFG
echo "sdbDebugFlag 199" >> $TAOS_CFG echo "sdbDebugFlag 199" >> $TAOS_CFG
echo "rpcDebugFlag 135" >> $TAOS_CFG echo "rpcDebugFlag 131" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG echo "cDebugFlag 135" >> $TAOS_CFG
echo "httpDebugFlag 131" >> $TAOS_CFG echo "httpDebugFlag 131" >> $TAOS_CFG
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册