提交 71d3106d 编写于 作者: J jtao1735

first version

上级 7acd7378
......@@ -385,7 +385,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
pMsg += sizeof(SMgmtHead);
SCMCfgDnodeMsg* pCfg = (SCMCfgDnodeMsg*)pMsg;
strncpy(pCfg->ip, pDCL->a[0].z, pDCL->a[0].n);
strncpy(pCfg->ep, pDCL->a[0].z, pDCL->a[0].n);
strncpy(pCfg->config, pDCL->a[1].z, pDCL->a[1].n);
......
......@@ -49,11 +49,11 @@ static void tscSetDnodeIpList(SSqlObj* pSql, STableMeta* pTableMeta) {
SRpcIpSet* pIpList = &pSql->ipList;
pIpList->numOfIps = pTableMeta->vgroupInfo.numOfIps;
pIpList->port = tsDnodeShellPort;
pIpList->inUse = 0;
for(int32_t i = 0; i < pTableMeta->vgroupInfo.numOfIps; ++i) {
pIpList->ip[i] = pTableMeta->vgroupInfo.ipAddr[i].ip;
strcpy(pIpList->fqdn[i], pTableMeta->vgroupInfo.ipAddr[i].fqdn);
pIpList->port[i] = pTableMeta->vgroupInfo.ipAddr[i].port;
}
}
......@@ -62,7 +62,7 @@ void tscPrintMgmtIp() {
tscError("invalid mgmt IP list:%d", tscMgmtIpSet.numOfIps);
} else {
for (int i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
tscTrace("mgmt index:%d ip:%d", i, tscMgmtIpSet.ip[i]);
tscTrace("mgmt index:%d %s:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]);
}
}
}
......@@ -70,9 +70,8 @@ void tscPrintMgmtIp() {
void tscSetMgmtIpListFromCluster(SRpcIpSet *pIpList) {
tscMgmtIpSet.numOfIps = pIpList->numOfIps;
tscMgmtIpSet.inUse = pIpList->inUse;
tscMgmtIpSet.port = htons(pIpList->port);
for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
tscMgmtIpSet.ip[i] = htonl(pIpList->ip[i]);
tscMgmtIpSet.port[i] = htons(pIpList->port[i]);
}
}
......@@ -80,8 +79,7 @@ void tscSetMgmtIpListFromEdge() {
if (tscMgmtIpSet.numOfIps != 1) {
tscMgmtIpSet.numOfIps = 1;
tscMgmtIpSet.inUse = 0;
tscMgmtIpSet.port = tsMnodeShellPort;
tscMgmtIpSet.ip[0] = inet_addr(tsMasterIp);
taosGetFqdnPortFromEp(tsMaster, tscMgmtIpSet.fqdn[0], &tscMgmtIpSet.port[0]);
tscTrace("edge mgmt IP list:");
tscPrintMgmtIp();
}
......@@ -213,9 +211,6 @@ int tscSendMsgToServer(SSqlObj *pSql) {
rpcSendRequest(pVnodeConn, &pSql->ipList, &rpcMsg);
} else {
pSql->ipList = tscMgmtIpSet;
pSql->ipList.port = tsMnodeShellPort;
tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port);
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
SRpcMsg rpcMsg = {
.msgType = pSql->cmd.msgType,
......@@ -224,6 +219,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.handle = pSql,
.code = 0
};
tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]);
rpcSendRequest(pObj->pMgmtConn, &pSql->ipList, &rpcMsg);
}
......@@ -664,11 +660,11 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCMVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index];
pSql->ipList.numOfIps = pVgroupInfo->numOfIps; // todo fix me
pSql->ipList.port = tsDnodeShellPort;
pSql->ipList.inUse = 0;
for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) {
pSql->ipList.ip[i] = pVgroupInfo->ipAddr[i].ip;
strcpy(pSql->ipList.fqdn[i], pVgroupInfo->ipAddr[i].fqdn);
pSql->ipList.port[i] = pVgroupInfo->ipAddr[i].port;
}
tscTrace("%p query on super table, numOfVgroup:%d, vgroupIndex:%d", pSql, pTableMetaInfo->vgroupList->numOfVgroups, index);
......@@ -935,7 +931,8 @@ int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
SCMCreateDnodeMsg *pCreate = (SCMCreateDnodeMsg *)pCmd->payload;
strncpy(pCreate->ip, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
strncpy(pCreate->ep, pInfo->pDCLInfo->a[0].z, pInfo->pDCLInfo->a[0].n);
pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_DNODE;
return TSDB_CODE_SUCCESS;
......@@ -1078,7 +1075,7 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCMDropDnodeMsg *pDrop = (SCMDropDnodeMsg *)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
strcpy(pDrop->ip, pTableMetaInfo->name);
strcpy(pDrop->ep, pTableMetaInfo->name);
pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_DNODE;
return TSDB_CODE_SUCCESS;
......@@ -1857,10 +1854,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
}
for (int i = 0; i < pMetaMsg->vgroup.numOfIps; ++i) {
pMetaMsg->vgroup.ipAddr[i].ip = htonl(pMetaMsg->vgroup.ipAddr[i].ip);
pMetaMsg->vgroup.ipAddr[i].port = htons(pMetaMsg->vgroup.ipAddr[i].port);
assert(pMetaMsg->vgroup.ipAddr[i].ip != 0);
}
SSchema* pSchema = pMetaMsg->schema;
......@@ -2144,7 +2138,6 @@ _error_clean:
assert(pVgroups->numOfIps >= 1);
for(int32_t j = 0; j < pVgroups->numOfIps; ++j) {
pVgroups->ipAddr[j].ip = htonl(pVgroups->ipAddr[j].ip);
pVgroups->ipAddr[j].port = htons(pVgroups->ipAddr[j].port);
}
}
......
......@@ -72,24 +72,24 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
return NULL;
}
tscMgmtIpSet.numOfIps = 0;
if (ip && ip[0]) {
tscMgmtIpSet.inUse = 0;
tscMgmtIpSet.port = tsMnodeShellPort;
tscMgmtIpSet.numOfIps = 1;
tscMgmtIpSet.ip[0] = inet_addr(ip);
if (tsMasterIp[0] && strcmp(ip, tsMasterIp) != 0) {
tscMgmtIpSet.numOfIps = 2;
tscMgmtIpSet.ip[1] = inet_addr(tsMasterIp);
strcpy(tscMgmtIpSet.fqdn[0], ip);
tscMgmtIpSet.port[0] = port? port: tsMnodeShellPort;
} else {
if (tsMaster[0] != 0) {
taosGetFqdnPortFromEp(tsMaster, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]);
tscMgmtIpSet.numOfIps++;
}
if (tsSecondIp[0] && strcmp(tsSecondIp, tsMasterIp) != 0) {
tscMgmtIpSet.numOfIps = 3;
tscMgmtIpSet.ip[2] = inet_addr(tsSecondIp);
if (tsSecond[0] != 0) {
taosGetFqdnPortFromEp(tsSecond, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]);
tscMgmtIpSet.numOfIps++;
}
}
tscMgmtIpSet.port = port ? port : tsMnodeShellPort;
STscObj *pObj = (STscObj *)calloc(1, sizeof(STscObj));
if (NULL == pObj) {
......@@ -167,10 +167,6 @@ static void syncConnCallback(void *param, TAOS_RES *tres, int code) {
}
TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) {
if (ip == NULL || (ip != NULL && (strcmp("127.0.0.1", ip) == 0 || strcasecmp("localhost", ip) == 0))) {
ip = tsMasterIp;
}
tscTrace("try to create a connection to %s", ip);
STscObj *pObj = taosConnectImpl(ip, user, pass, db, port, NULL, NULL, NULL);
......
......@@ -1245,7 +1245,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
// data in from current vnode is stored in cache and disk
uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems;
tscTrace("%p sub:%p all data retrieved from ip:%u,vgId:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql,
pTableMetaInfo->vgroupList->vgroups[0].ipAddr[0].ip, pTableMetaInfo->vgroupList->vgroups[0].vgId,
pTableMetaInfo->vgroupList->vgroups[0].ipAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId,
numOfRowsFromSubquery, idx);
tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);
......@@ -1473,12 +1473,12 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
if (pState->code != TSDB_CODE_SUCCESS) { // at least one peer subquery failed, abort current query
tscTrace("%p sub:%p query failed,ip:%u,vgId:%d,orderOfSub:%d,global code:%d", pParentSql, pSql,
pVgroup->ipAddr[0].ip, pVgroup->vgId, trsupport->subqueryIndex, pState->code);
pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex, pState->code);
tscHandleSubqueryError(param, tres, pState->code);
} else { // success, proceed to retrieve data from dnode
tscTrace("%p sub:%p query complete, ip:%u, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSqlObj, pSql,
pVgroup->ipAddr[0].ip, pVgroup->vgId, trsupport->subqueryIndex);
pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex);
if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode
tscRetrieveFromDnodeCallBack(param, pSql, 0);
......
......@@ -55,7 +55,6 @@ int32_t tscInitRpc(const char *user, const char *secret, void** pMgmtConn) {
if (pVnodeConn == NULL) {
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsLocalIp;
rpcInit.localPort = 0;
rpcInit.label = "TSC-vnode";
rpcInit.numOfThreads = tscNumOfThreads;
......@@ -76,7 +75,6 @@ int32_t tscInitRpc(const char *user, const char *secret, void** pMgmtConn) {
if (*pMgmtConn == NULL) {
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsLocalIp;
rpcInit.localPort = 0;
rpcInit.label = "TSC-mgmt";
rpcInit.numOfThreads = 1;
......@@ -109,12 +107,6 @@ void taos_init_imp() {
deltaToUtcInitOnce();
if (tscEmbedded == 0) {
/*
* set localIp = 0
* means unset tsLocalIp in client
* except read from config file
*/
strcpy(tsLocalIp, "0.0.0.0");
// Read global configuration.
taosInitGlobalCfg();
......@@ -133,7 +125,7 @@ void taos_init_imp() {
taosPrintGlobalCfg();
tscTrace("starting to initialize TAOS client ...");
tscTrace("Local IP address is:%s", tsLocalIp);
tscTrace("Local End Point is:%s", tsLocalEp);
}
taosSetCoreDump();
......@@ -143,13 +135,12 @@ void taos_init_imp() {
}
tscMgmtIpSet.inUse = 0;
tscMgmtIpSet.port = tsMnodeShellPort;
tscMgmtIpSet.numOfIps = 1;
tscMgmtIpSet.ip[0] = inet_addr(tsMasterIp);
taosGetFqdnPortFromEp(tsMaster, tscMgmtIpSet.fqdn[0], &tscMgmtIpSet.port[0]);
if (tsSecondIp[0] && strcmp(tsSecondIp, tsMasterIp) != 0) {
if (tsSecond[0] && strcmp(tsSecond, tsMaster) != 0) {
tscMgmtIpSet.numOfIps = 2;
tscMgmtIpSet.ip[1] = inet_addr(tsSecondIp);
taosGetFqdnPortFromEp(tsSecond, tscMgmtIpSet.fqdn[1], &tscMgmtIpSet.port[1]);
}
tscInitMsgsFp();
......
......@@ -51,8 +51,10 @@ extern int32_t tsVersion;
extern int32_t tscEmbedded;
extern int64_t tsMsPerDay[2];
extern char tsMasterIp[];
extern char tsSecondIp[];
extern char tsMaster[];
extern char tsSecond[];
extern char tsLocalEp[];
extern uint16_t tsServerPort;
extern uint16_t tsMnodeDnodePort;
extern uint16_t tsMnodeShellPort;
extern uint16_t tsDnodeShellPort;
......@@ -178,7 +180,8 @@ void taosInitGlobalCfg();
bool taosCheckGlobalCfg();
void taosSetAllDebugFlag();
bool taosCfgDynamicOptions(char *msg);
int taosGetFqdnPortFromEp(char *ep, char *fqdn, uint16_t *port);
#ifdef __cplusplus
}
#endif
......
......@@ -60,8 +60,10 @@ int32_t tscEmbedded = 0;
*/
int64_t tsMsPerDay[] = {86400000L, 86400000000L};
char tsMasterIp[TSDB_IPv4ADDR_LEN] = {0};
char tsSecondIp[TSDB_IPv4ADDR_LEN] = {0};
char tsMaster[TSDB_FQDN_LEN] = {0};
char tsSecond[TSDB_FQDN_LEN] = {0};
char tsLocalEp[TSDB_FQDN_LEN] = {0}; // Local End Point, hostname:port
uint16_t tsServerPort = 6030;
uint16_t tsMnodeShellPort = 6030; // udp[6030-6034] tcp[6030]
uint16_t tsDnodeShellPort = 6035; // udp[6035-6039] tcp[6035]
uint16_t tsMnodeDnodePort = 6040; // udp/tcp
......@@ -70,8 +72,6 @@ uint16_t tsSyncPort = 6050;
int32_t tsStatusInterval = 1; // second
int32_t tsShellActivityTimer = 3; // second
int32_t tsVnodePeerHBTimer = 1; // second
int32_t tsMgmtPeerHBTimer = 1; // second
int32_t tsMeterMetaKeepTimer = 7200; // second
int32_t tsMetricMetaKeepTimer = 600; // second
int32_t tsRpcTimer = 300;
......@@ -79,8 +79,6 @@ int32_t tsRpcMaxTime = 600; // seconds;
float tsNumOfThreadsPerCore = 1.0;
float tsRatioOfQueryThreads = 0.5;
char tsPublicIp[TSDB_IPv4ADDR_LEN] = {0};
char tsPrivateIp[TSDB_IPv4ADDR_LEN] = {0};
int16_t tsNumOfVnodesPerCore = 8;
int16_t tsNumOfTotalVnodes = TSDB_INVALID_VNODE_NUM;
......@@ -112,7 +110,6 @@ int32_t tsNumOfMPeers = 3;
int32_t tsMaxShellConns = 2000;
int32_t tsMaxTables = 100000;
char tsLocalIp[TSDB_IPv4ADDR_LEN] = {0};
char tsDefaultDB[TSDB_DB_NAME_LEN] = {0};
char tsDefaultUser[64] = "root";
char tsDefaultPass[64] = "taosdata";
......@@ -183,7 +180,6 @@ float tsStreamComputDelayRatio = 0.1;
int32_t tsProjectExecInterval = 10000; // every 10sec, the projection will be executed once
int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
char tsHttpIp[TSDB_IPv4ADDR_LEN] = "0.0.0.0";
uint16_t tsHttpPort = 6020; // only tcp, range tcp[6020]
// uint16_t tsNginxPort = 6060; //only tcp, range tcp[6060]
int32_t tsHttpCacheSessions = 100;
......@@ -194,7 +190,6 @@ int32_t tsHttpEnableRecordSql = 0;
int32_t tsTelegrafUseFieldNum = 0;
int32_t tsTscEnableRecordSql = 0;
int32_t tsAnyIp = 1;
uint32_t tsPublicIpInt = 0;
char tsMonitorDbName[TSDB_DB_NAME_LEN] = "log";
......@@ -274,69 +269,29 @@ static void doInitGlobalConfig() {
SGlobalCfg cfg = {0};
// ip address
cfg.option = "masterIp";
cfg.ptr = tsMasterIp;
cfg.option = "master";
cfg.ptr = tsMaster;
cfg.valType = TAOS_CFG_VTYPE_IPSTR;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = TSDB_IPv4ADDR_LEN;
cfg.ptrLength = TSDB_FQDN_LEN;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "secondIp";
cfg.ptr = tsSecondIp;
cfg.option = "second";
cfg.ptr = tsSecond;
cfg.valType = TAOS_CFG_VTYPE_IPSTR;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = TSDB_IPv4ADDR_LEN;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "publicIp";
cfg.ptr = tsPublicIp;
cfg.valType = TAOS_CFG_VTYPE_IPSTR;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = TSDB_IPv4ADDR_LEN;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "privateIp";
cfg.ptr = tsPrivateIp;
cfg.valType = TAOS_CFG_VTYPE_IPSTR;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = TSDB_IPv4ADDR_LEN;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "localIp";
cfg.ptr = tsLocalIp;
cfg.valType = TAOS_CFG_VTYPE_IPSTR;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = TSDB_IPv4ADDR_LEN;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "httpIp";
cfg.ptr = tsHttpIp;
cfg.valType = TAOS_CFG_VTYPE_IPSTR;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = TSDB_IPv4ADDR_LEN;
cfg.ptrLength = TSDB_FQDN_LEN;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// port
cfg.option = "mnodeShellPort";
cfg.ptr = &tsMnodeShellPort;
cfg.option = "serverPort";
cfg.ptr = &tsServerPort;
cfg.valType = TAOS_CFG_VTYPE_INT16;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 1;
......@@ -345,56 +300,6 @@ static void doInitGlobalConfig() {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "dnodeShellPort";
cfg.ptr = &tsDnodeShellPort;
cfg.valType = TAOS_CFG_VTYPE_INT16;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 1;
cfg.maxValue = 65535;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "mnodeDnodePort";
cfg.ptr = &tsMnodeDnodePort;
cfg.valType = TAOS_CFG_VTYPE_INT16;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 1;
cfg.maxValue = 65535;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "dnodeMnodePort";
cfg.ptr = &tsDnodeMnodePort;
cfg.valType = TAOS_CFG_VTYPE_INT16;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 1;
cfg.maxValue = 65535;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// cfg.option = "syncPort";
// cfg.ptr = &syncPort;
// cfg.valType = TAOS_CFG_VTYPE_INT16;
// cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
// cfg.minValue = 1;
// cfg.maxValue = 65535;
// cfg.ptrLength = 0;
// cfg.unitType = TAOS_CFG_UTYPE_NONE;
// taosInitConfigOption(cfg);
cfg.option = "httpPort";
cfg.ptr = &tsHttpPort;
cfg.valType = TAOS_CFG_VTYPE_INT16;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 1;
cfg.maxValue = 65535;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// directory
cfg.option = "configDir";
cfg.ptr = configDir;
......@@ -1227,16 +1132,6 @@ static void doInitGlobalConfig() {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "anyIp";
cfg.ptr = &tsAnyIp;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// version info
cfg.option = "gitinfo";
cfg.ptr = gitinfo;
......@@ -1284,25 +1179,15 @@ void taosInitGlobalCfg() {
}
bool taosCheckGlobalCfg() {
if (tsPrivateIp[0] == 0) {
taosGetPrivateIp(tsPrivateIp);
}
if (tsPublicIp[0] == 0) {
strcpy(tsPublicIp, tsPrivateIp);
}
tsPublicIpInt = inet_addr(tsPublicIp);
if (tsLocalIp[0] == 0) {
strcpy(tsLocalIp, tsPrivateIp);
}
taosGetFqdn(tsLocalEp);
sprintf(tsLocalEp + strlen(tsLocalEp), ":%d", tsServerPort);
if (tsMasterIp[0] == 0) {
strcpy(tsMasterIp, tsPrivateIp);
if (tsMaster[0] == 0) {
strcpy(tsMaster, tsLocalEp);
}
if (tsSecondIp[0] == 0) {
strcpy(tsSecondIp, tsMasterIp);
if (tsSecond[0] == 0) {
strcpy(tsSecond, tsLocalEp);
}
taosGetSystemInfo();
......@@ -1324,15 +1209,6 @@ bool taosCheckGlobalCfg() {
tsNumOfTotalVnodes = tsNumOfTotalVnodes < TSDB_MIN_VNODES ? TSDB_MIN_VNODES : tsNumOfTotalVnodes;
}
if (strlen(tsPrivateIp) == 0) {
uError("privateIp is null");
return false;
}
if (tscEmbedded) {
strcpy(tsLocalIp, tsPrivateIp);
}
// todo refactor
tsVersion = 0;
for (int i = 0; i < 10; i++) {
......@@ -1345,5 +1221,26 @@ bool taosCheckGlobalCfg() {
tsVersion = 10 * tsVersion;
tsMnodeShellPort = tsServerPort + TSDB_PORT_MNODESHELL; // udp[6030-6034] tcp[6030]
tsDnodeShellPort = tsServerPort + TSDB_PORT_DNODESHELL; // udp[6035-6039] tcp[6035]
tsMnodeDnodePort = tsServerPort + TSDB_PORT_MNODEDNODE; // udp/tcp
tsDnodeMnodePort = tsServerPort + TSDB_PORT_DNODEMNODE; // udp/tcp
tsSyncPort = tsServerPort + TSDB_PORT_SYNC;
return true;
}
int taosGetFqdnPortFromEp(char *ep, char *fqdn, uint16_t *port) {
*port = 0;
strcpy(fqdn, ep);
char *temp = strchr(fqdn, ':');
if (temp) {
*temp = 0;
*port = atoi(temp+1);
}
if (*port == 0) *port = tsServerPort;
return 0;
}
......@@ -59,16 +59,13 @@ void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) {
tsMnodeIpSet = *pIpSet;
}
void dnodeGetMnodeIpSet(void *ipSetRaw, bool usePublicIp) {
void dnodeGetMnodeDnodeIpSet(void *ipSetRaw) {
SRpcIpSet *ipSet = ipSetRaw;
ipSet->numOfIps = tsMnodeInfos.nodeNum;
ipSet->inUse = tsMnodeInfos.inUse;
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; ++i) {
if (usePublicIp) {
ipSet->ip[i] = tsMnodeInfos.nodeInfos[i].nodeIp;
} else {
ipSet->ip[i] = tsMnodeInfos.nodeInfos[i].nodeIp;
}
taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, ipSet->fqdn[i], &ipSet->port[i]);
ipSet->port[i] += TSDB_PORT_MNODEDNODE;
}
}
......@@ -85,26 +82,23 @@ int32_t dnodeInitMClient() {
if (!dnodeReadMnodeInfos()) {
memset(&tsMnodeIpSet, 0, sizeof(SRpcIpSet));
memset(&tsMnodeInfos, 0, sizeof(SDMMnodeInfos));
tsMnodeIpSet.port = tsMnodeDnodePort;
tsMnodeIpSet.numOfIps = 1;
tsMnodeIpSet.ip[0] = inet_addr(tsMasterIp);
if (strcmp(tsSecondIp, tsMasterIp) != 0) {
taosGetFqdnPortFromEp(tsMaster, tsMnodeIpSet.fqdn[0], &tsMnodeIpSet.port[0]);
tsMnodeIpSet.port[0] += TSDB_PORT_MNODEDNODE;
if (strcmp(tsSecond, tsMaster) != 0) {
tsMnodeIpSet.numOfIps = 2;
tsMnodeIpSet.ip[1] = inet_addr(tsSecondIp);
taosGetFqdnPortFromEp(tsSecond, tsMnodeIpSet.fqdn[1], &tsMnodeIpSet.port[1]);
}
} else {
tsMnodeIpSet.inUse = tsMnodeInfos.inUse;
tsMnodeIpSet.numOfIps = tsMnodeInfos.nodeNum;
tsMnodeIpSet.port = tsMnodeInfos.nodeInfos[0].nodePort;
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
tsMnodeIpSet.ip[i] = tsMnodeInfos.nodeInfos[i].nodeIp;
taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, tsMnodeIpSet.fqdn[i], &tsMnodeIpSet.port[i]);
}
}
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;
rpcInit.localPort = 0;
rpcInit.label = "DND-MC";
rpcInit.numOfThreads = 1;
rpcInit.cfp = dnodeProcessRspFromMnode;
......@@ -182,9 +176,6 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
for (int32_t i = 0; i < pMnodes->nodeNum; ++i) {
SDMMnodeInfo *pMnodeInfo = &pMnodes->nodeInfos[i];
pMnodeInfo->nodeId = htonl(pMnodeInfo->nodeId);
pMnodeInfo->nodeIp = htonl(pMnodeInfo->nodeIp);
pMnodeInfo->nodePort = htons(pMnodeInfo->nodePort);
pMnodeInfo->syncPort = htons(pMnodeInfo->syncPort);
}
SDMVgroupAccess *pVgAcccess = pStatusRsp->vgAccess;
......@@ -207,15 +198,14 @@ static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) {
tsMnodeIpSet.inUse = tsMnodeInfos.inUse;
tsMnodeIpSet.numOfIps = tsMnodeInfos.nodeNum;
tsMnodeIpSet.port = tsMnodeInfos.nodeInfos[0].nodePort;
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
tsMnodeIpSet.ip[i] = tsMnodeInfos.nodeInfos[i].nodeIp;
taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, tsMnodeIpSet.fqdn[i], &tsMnodeIpSet.port[i]);
tsMnodeIpSet.port[i] += TSDB_PORT_MNODEDNODE;
}
dPrint("mnodes is changed, nodeNum:%d inUse:%d", tsMnodeInfos.nodeNum, tsMnodeInfos.inUse);
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
dPrint("mnode:%d, ip:%s:%u name:%s", tsMnodeInfos.nodeInfos[i].nodeId, taosIpStr(tsMnodeInfos.nodeInfos[i].nodeIp),
tsMnodeInfos.nodeInfos[i].nodePort, tsMnodeInfos.nodeInfos[i].nodeName);
dPrint("mnode:%d, %s", tsMnodeInfos.nodeInfos[i].nodeId, tsMnodeInfos.nodeInfos[i].nodeEp);
}
dnodeSaveMnodeInfos();
......@@ -291,42 +281,19 @@ static bool dnodeReadMnodeInfos() {
}
tsMnodeInfos.nodeInfos[i].nodeId = nodeId->valueint;
cJSON *nodeIp = cJSON_GetObjectItem(nodeInfo, "nodeIp");
if (!nodeIp || nodeIp->type != cJSON_String || nodeIp->valuestring == NULL) {
dError("failed to read mnode mgmtIpList.json, nodeIp not found");
goto PARSE_OVER;
}
tsMnodeInfos.nodeInfos[i].nodeIp = inet_addr(nodeIp->valuestring);
cJSON *nodePort = cJSON_GetObjectItem(nodeInfo, "nodePort");
if (!nodePort || nodePort->type != cJSON_Number) {
dError("failed to read mnode mgmtIpList.json, nodePort not found");
goto PARSE_OVER;
}
tsMnodeInfos.nodeInfos[i].nodePort = (uint16_t)nodePort->valueint;
cJSON *syncPort = cJSON_GetObjectItem(nodeInfo, "syncPort");
if (!syncPort || syncPort->type != cJSON_Number) {
dError("failed to read mnode mgmtIpList.json, syncPort not found");
goto PARSE_OVER;
}
tsMnodeInfos.nodeInfos[i].syncPort = (uint16_t)syncPort->valueint;
cJSON *nodeName = cJSON_GetObjectItem(nodeInfo, "nodeName");
if (!nodeName || nodeName->type != cJSON_String || nodeName->valuestring == NULL) {
cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp");
if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) {
dError("failed to read mnode mgmtIpList.json, nodeName not found");
goto PARSE_OVER;
}
strncpy(tsMnodeInfos.nodeInfos[i].nodeName, nodeName->valuestring, TSDB_NODE_NAME_LEN);
}
strncpy(tsMnodeInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_FQDN_LEN);
}
ret = true;
dPrint("read mnode iplist successed, numOfIps:%d inUse:%d", tsMnodeInfos.nodeNum, tsMnodeInfos.inUse);
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
dPrint("mnode:%d, ip:%s:%u name:%s", tsMnodeInfos.nodeInfos[i].nodeId,
taosIpStr(tsMnodeInfos.nodeInfos[i].nodeIp), tsMnodeInfos.nodeInfos[i].nodePort,
tsMnodeInfos.nodeInfos[i].nodeName);
dPrint("mnode:%d, %s", tsMnodeInfos.nodeInfos[i].nodeId, tsMnodeInfos.nodeInfos[i].nodeEp);
}
PARSE_OVER:
......@@ -352,10 +319,7 @@ static void dnodeSaveMnodeInfos() {
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsMnodeInfos.nodeInfos[i].nodeId);
len += snprintf(content + len, maxLen - len, " \"nodeIp\": \"%s\",\n", taosIpStr(tsMnodeInfos.nodeInfos[i].nodeIp));
len += snprintf(content + len, maxLen - len, " \"nodePort\": %u,\n", tsMnodeInfos.nodeInfos[i].nodePort);
len += snprintf(content + len, maxLen - len, " \"syncPort\": %u,\n", tsMnodeInfos.nodeInfos[i].syncPort);
len += snprintf(content + len, maxLen - len, " \"nodeName\": \"%s\"\n", tsMnodeInfos.nodeInfos[i].nodeName);
len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", tsMnodeInfos.nodeInfos[i].nodeEp);
if (i < tsMnodeInfos.nodeNum -1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
......@@ -371,8 +335,8 @@ static void dnodeSaveMnodeInfos() {
dPrint("save mnode iplist successed");
}
uint32_t dnodeGetMnodeMasteIp() {
return tsMnodeIpSet.ip[tsMnodeIpSet.inUse];
char *dnodeGetMnodeMasterEp() {
return tsMnodeInfos.nodeInfos[tsMnodeIpSet.inUse].nodeEp;
}
void* dnodeGetMnodeInfos() {
......@@ -402,8 +366,7 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
//strcpy(pStatus->dnodeName, tsDnodeName);
pStatus->version = htonl(tsVersion);
pStatus->dnodeId = htonl(tsDnodeCfg.dnodeId);
pStatus->privateIp = htonl(inet_addr(tsPrivateIp));
pStatus->publicIp = htonl(inet_addr(tsPublicIp));
strcpy(pStatus->dnodeEp, tsLocalEp);
pStatus->lastReboot = htonl(tsRebootTime);
pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes);
pStatus->numOfCores = htons((uint16_t) tsNumOfCores);
......@@ -500,4 +463,4 @@ void dnodeUpdateDnodeCfg(SDMDnodeCfg *pCfg) {
int32_t dnodeGetDnodeId() {
return tsDnodeCfg.dnodeId;
}
\ No newline at end of file
}
......@@ -161,8 +161,7 @@ static int32_t dnodeInitSystem() {
}
taosPrintGlobalCfg();
dPrint("Server IP address is:%s", tsPrivateIp);
dPrint("starting to initialize TDengine ...");
dPrint("start to initialize TDengine on %s", tsLocalEp);
if (dnodeInitStorage() != 0) return -1;
if (dnodeInitRead() != 0) return -1;
......@@ -237,5 +236,5 @@ static int32_t dnodeInitStorage() {
static void dnodeCleanupStorage() {}
bool dnodeIsFirstDeploy() {
return strcmp(tsMasterIp, tsPrivateIp) == 0;
}
\ No newline at end of file
return strcmp(tsMaster, tsLocalEp) == 0;
}
......@@ -139,11 +139,9 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
pCreate->cfg.minRowsPerFileBlock = htonl(pCreate->cfg.minRowsPerFileBlock);
pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock);
pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime);
pCreate->cfg.arbitratorIp = htonl(pCreate->cfg.arbitratorIp);
for (int32_t j = 0; j < pCreate->cfg.replications; ++j) {
pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId);
pCreate->nodes[j].nodeIp = htonl(pCreate->nodes[j].nodeIp);
}
void *pVnode = vnodeAccquireVnode(pCreate->cfg.vgId);
......
......@@ -38,7 +38,6 @@ int32_t dnodeInitMnode() {
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;
rpcInit.localPort = tsDnodeMnodePort;
rpcInit.label = "DND-MS";
rpcInit.numOfThreads = 1;
......
......@@ -47,7 +47,6 @@ int32_t dnodeInitShell() {
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;
rpcInit.localPort = tsDnodeShellPort;
rpcInit.label = "DND-shell";
rpcInit.numOfThreads = numOfThreads;
......
......@@ -42,8 +42,8 @@ void dnodeFreeRqueue(void *rqueue);
void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code);
bool dnodeIsFirstDeploy();
uint32_t dnodeGetMnodeMasteIp();
void dnodeGetMnodeIpSet(void *ipSet, bool usePublicIp);
char *dnodeGetMnodeMasterEp();
void dnodeGetMnodeDnodeIpSet(void *ipSet);
void * dnodeGetMnodeInfos();
int32_t dnodeGetDnodeId();
......
......@@ -217,8 +217,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_DNODE_ROLE_MGMT 1
#define TSDB_DNODE_ROLE_VNODE 2
#define TSDB_MAX_MPEERS 5
#define TSDB_MAX_MGMT_IPS (TSDB_MAX_MPEERS+1)
#define TSDB_MAX_REPLICA 5
#define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_MULTI_METERMETA_MAX_NUM 100000 // maximum batch size allowed to load metermeta
......@@ -319,6 +318,12 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_MAX_NORMAL_TABLES 1000
#define TSDB_MAX_CHILD_TABLES 100000
#define TSDB_PORT_MNODESHELL 0
#define TSDB_PORT_DNODESHELL 5
#define TSDB_PORT_DNODEMNODE 10
#define TSDB_PORT_MNODEDNODE 15
#define TSDB_PORT_SYNC 20
typedef enum {
TSDB_PRECISION_MILLI,
TSDB_PRECISION_MICRO,
......
......@@ -549,24 +549,19 @@ typedef struct {
typedef struct {
int32_t nodeId;
uint32_t nodeIp;
uint16_t nodePort;
uint16_t syncPort;
char nodeName[TSDB_NODE_NAME_LEN + 1];
char nodeEp[TSDB_FQDN_LEN];
} SDMMnodeInfo;
typedef struct {
int8_t inUse;
int8_t nodeNum;
SDMMnodeInfo nodeInfos[TSDB_MAX_MPEERS];
SDMMnodeInfo nodeInfos[TSDB_MAX_REPLICA];
} SDMMnodeInfos;
typedef struct {
uint32_t version;
int32_t dnodeId;
char dnodeName[TSDB_NODE_NAME_LEN + 1];
uint32_t privateIp;
uint32_t publicIp;
char dnodeEp[TSDB_FQDN_LEN];
uint32_t moduleStatus;
uint32_t lastReboot; // time stamp for last reboot
uint16_t numOfTotalVnodes; // from config file
......@@ -603,19 +598,17 @@ typedef struct {
int8_t replications;
int8_t wals;
int8_t quorum;
uint32_t arbitratorIp;
int8_t reserved[16];
} SMDVnodeCfg;
typedef struct {
int32_t nodeId;
uint32_t nodeIp;
char nodeName[TSDB_NODE_NAME_LEN + 1];
char nodeEp[TSDB_FQDN_LEN];
} SMDVnodeDesc;
typedef struct {
SMDVnodeCfg cfg;
SMDVnodeDesc nodes[TSDB_MAX_MPEERS];
SMDVnodeDesc nodes[TSDB_MAX_REPLICA];
} SMDCreateVnodeMsg;
typedef struct {
......@@ -738,7 +731,7 @@ typedef struct SCMShowRsp {
} SCMShowRsp;
typedef struct {
char ip[32];
char ep[TSDB_FQDN_LEN]; // end point, hostname:port
} SCMCreateDnodeMsg, SCMDropDnodeMsg;
typedef struct {
......@@ -753,7 +746,7 @@ typedef struct {
} SDMConfigVnodeMsg;
typedef struct {
char ip[32];
char ep[TSDB_FQDN_LEN]; // end point, hostname:port
char config[64];
} SMDCfgDnodeMsg, SCMCfgDnodeMsg;
......
......@@ -31,8 +31,8 @@ extern int tsRpcHeadSize;
typedef struct {
int8_t inUse;
int8_t numOfIps;
uint16_t port[TSDB_MAX_MPEERS];
char fqdn[TSDB_MAX_MPEERS][TSDB_FQDN_LEN];
uint16_t port[TSDB_MAX_REPLICA];
char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN];
} SRpcIpSet;
typedef struct {
......
......@@ -31,13 +31,9 @@ struct SMnodeObj;
typedef struct SDnodeObj {
int32_t dnodeId;
uint32_t privateIp;
uint32_t publicIp;
uint16_t mnodeShellPort;
uint16_t mnodeDnodePort;
uint16_t dnodeShellPort;
uint16_t dnodeMnodePort;
uint16_t syncPort;
uint16_t dnodePort;
char dnodeFqdn[TSDB_FQDN_LEN];
char dnodeEp[TSDB_FQDN_LEN];
int64_t createdTime;
uint32_t lastAccess;
int32_t openVnodes;
......@@ -47,7 +43,6 @@ typedef struct SDnodeObj {
int8_t alternativeRole; // from dnode status msg, 0-any, 1-mgmt, 2-dnode
int8_t status; // set in balance function
int8_t isMgmt;
char dnodeName[TSDB_NODE_NAME_LEN + 1];
int8_t reserved[15];
int8_t updateEnd[1];
int32_t refCount;
......@@ -123,7 +118,7 @@ typedef struct SVgObj {
uint32_t vgId;
char dbName[TSDB_DB_NAME_LEN + 1];
int64_t createdTime;
SVnodeGid vnodeGid[TSDB_VNODES_SUPPORT];
SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
int32_t numOfVnodes;
int32_t lbDnodeId;
int32_t lbTime;
......
......@@ -38,7 +38,7 @@ void * mgmtGetNextDnode(void *pNode, SDnodeObj **pDnode);
void mgmtIncDnodeRef(SDnodeObj *pDnode);
void mgmtDecDnodeRef(SDnodeObj *pDnode);
void * mgmtGetDnode(int32_t dnodeId);
void * mgmtGetDnodeByIp(uint32_t ip);
void * mgmtGetDnodeByIp(char *ep);
void mgmtUpdateDnode(SDnodeObj *pDnode);
int32_t mgmtDropDnode(SDnodeObj *pDnode);
......
......@@ -40,7 +40,7 @@ void * mgmtGetNextMnode(void *pNode, struct SMnodeObj **pMnode);
void mgmtReleaseMnode(struct SMnodeObj *pMnode);
char * mgmtGetMnodeRoleStr();
void mgmtGetMnodeIpSet(SRpcIpSet *ipSet, bool usePublicIp);
void mgmtGetMnodeIpSet(SRpcIpSet *ipSet);
void mgmtGetMnodeInfos(void *mnodes);
#ifdef __cplusplus
......
......@@ -50,7 +50,7 @@ void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle);
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup);
SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip);
SRpcIpSet mgmtGetIpSetFromIp(char *ep);
#ifdef __cplusplus
}
......
......@@ -38,7 +38,6 @@ static void *tsMgmtDClientRpc = NULL;
int32_t mgmtInitDClient() {
SRpcInit rpcInit = {0};
rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;
rpcInit.localPort = 0;
rpcInit.label = "MND-DC";
rpcInit.numOfThreads = 1;
......
......@@ -43,7 +43,6 @@ static void *tsMgmtDServerQhandle = NULL;
int32_t mgmtInitDServer() {
SRpcInit rpcInit = {0};
rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;;
rpcInit.localPort = tsMnodeDnodePort;
rpcInit.label = "MND-DS";
rpcInit.numOfThreads = 1;
......@@ -105,14 +104,12 @@ static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg) {
if (!sdbIsMaster()) {
SRpcConnInfo connInfo;
rpcGetConnInfo(rpcMsg->handle, &connInfo);
bool usePublicIp = false;
SRpcIpSet ipSet = {0};
ipSet.port = tsMnodeDnodePort;
dnodeGetMnodeIpSet(&ipSet, usePublicIp);
dnodeGetMnodeDnodeIpSet(&ipSet);
mTrace("conn from dnode ip:%s user:%s redirect msg, inUse:%d", taosIpStr(connInfo.clientIp), connInfo.user, ipSet.inUse);
for (int32_t i = 0; i < ipSet.numOfIps; ++i) {
mTrace("index:%d ip:%s", i, taosIpStr(ipSet.ip[i]));
mTrace("index:%d %s:%d", i, ipSet.fqdn[i], ipSet.port[i]);
}
rpcSendRedirectRsp(rpcMsg->handle, &ipSet);
return;
......
......@@ -42,7 +42,7 @@ int32_t tsAccessSquence = 0;
extern void * tsMnodeSdb;
extern void * tsVgroupSdb;
static int32_t mgmtCreateDnode(uint32_t ip);
static int32_t mgmtCreateDnode(char *ep);
static void mgmtProcessCreateDnodeMsg(SQueuedMsg *pMsg);
static void mgmtProcessDropDnodeMsg(SQueuedMsg *pMsg);
static void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg);
......@@ -68,12 +68,6 @@ static int32_t mgmtDnodeActionInsert(SSdbOper *pOper) {
pDnode->status = TAOS_DN_STATUS_OFFLINE;
}
pDnode->mnodeShellPort = tsMnodeShellPort;
pDnode->mnodeDnodePort = tsMnodeDnodePort;
pDnode->dnodeShellPort = tsDnodeShellPort;
pDnode->dnodeMnodePort = tsDnodeMnodePort;
pDnode->syncPort = tsSyncPort;
return TSDB_CODE_SUCCESS;
}
......@@ -144,9 +138,8 @@ static int32_t mgmtDnodeActionDecode(SSdbOper *pOper) {
static int32_t mgmtDnodeActionRestored() {
int32_t numOfRows = sdbGetNumOfRows(tsDnodeSdb);
if (numOfRows <= 0 && dnodeIsFirstDeploy()) {
uint32_t ip = inet_addr(tsPrivateIp);
mgmtCreateDnode(ip);
SDnodeObj *pDnode = mgmtGetDnodeByIp(ip);
mgmtCreateDnode(tsLocalEp);
SDnodeObj *pDnode = mgmtGetDnodeByIp(tsLocalEp);
mgmtAddMnode(pDnode->dnodeId);
mgmtDecDnodeRef(pDnode);
}
......@@ -214,14 +207,14 @@ void *mgmtGetDnode(int32_t dnodeId) {
return sdbGetRow(tsDnodeSdb, &dnodeId);
}
void *mgmtGetDnodeByIp(uint32_t ip) {
void *mgmtGetDnodeByIp(char *ep) {
SDnodeObj *pDnode = NULL;
void * pNode = NULL;
while (1) {
pNode = sdbFetchRow(tsDnodeSdb, pNode, (void**)&pDnode);
if (pDnode == NULL) break;
if (ip == pDnode->privateIp) {
if (strcmp(ep, pDnode->dnodeEp) == 0) {
return pDnode;
}
mgmtDecDnodeRef(pDnode);
......@@ -253,19 +246,18 @@ void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SCMCfgDnodeMsg *pCmCfgDnode = pMsg->pCont;
if (pCmCfgDnode->ip[0] == 0) {
strcpy(pCmCfgDnode->ip, tsPrivateIp);
if (pCmCfgDnode->ep[0] == 0) {
strcpy(pCmCfgDnode->ep, tsLocalEp);
} else {
strcpy(pCmCfgDnode->ip, pCmCfgDnode->ip);
strcpy(pCmCfgDnode->ep, pCmCfgDnode->ep);
}
uint32_t dnodeIp = inet_addr(pCmCfgDnode->ip);
if (strcmp(pMsg->pUser->user, "root") != 0) {
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
} else {
SRpcIpSet ipSet = mgmtGetIpSetFromIp(dnodeIp);
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCmCfgDnode->ep);
SMDCfgDnodeMsg *pMdCfgDnode = rpcMallocCont(sizeof(SMDCfgDnodeMsg));
strcpy(pMdCfgDnode->ip, pCmCfgDnode->ip);
strcpy(pMdCfgDnode->ep, pCmCfgDnode->ep);
strcpy(pMdCfgDnode->config, pCmCfgDnode->config);
SRpcMsg rpcMdCfgDnodeMsg = {
.handle = 0,
......@@ -279,7 +271,7 @@ void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg) {
}
if (rpcRsp.code == TSDB_CODE_SUCCESS) {
mPrint("dnode:%s, is configured by %s", pCmCfgDnode->ip, pMsg->pUser->user);
mPrint("dnode:%s, is configured by %s", pCmCfgDnode->ep, pMsg->pUser->user);
}
rpcSendResponse(&rpcRsp);
......@@ -292,8 +284,6 @@ static void mgmtProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) {
void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
SDMStatusMsg *pStatus = rpcMsg->pCont;
pStatus->dnodeId = htonl(pStatus->dnodeId);
pStatus->privateIp = htonl(pStatus->privateIp);
pStatus->publicIp = htonl(pStatus->publicIp);
pStatus->moduleStatus = htonl(pStatus->moduleStatus);
pStatus->lastReboot = htonl(pStatus->lastReboot);
pStatus->numOfCores = htons(pStatus->numOfCores);
......@@ -308,23 +298,21 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
SDnodeObj *pDnode = NULL;
if (pStatus->dnodeId == 0) {
pDnode = mgmtGetDnodeByIp(pStatus->privateIp);
pDnode = mgmtGetDnodeByIp(pStatus->dnodeEp);
if (pDnode == NULL) {
mTrace("dnode not created, privateIp:%s", taosIpStr(pStatus->privateIp));
mTrace("dnode %s not created", pStatus->dnodeEp);
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST);
return;
}
} else {
pDnode = mgmtGetDnode(pStatus->dnodeId);
if (pDnode == NULL) {
mError("dnode:%d, not exist, privateIp:%s", pStatus->dnodeId, taosIpStr(pStatus->privateIp));
mError("dnode id:%d, %s not exist", pStatus->dnodeId, pStatus->dnodeEp);
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST);
return;
}
}
pDnode->privateIp = pStatus->privateIp;
pDnode->publicIp = pStatus->publicIp;
pDnode->lastReboot = pStatus->lastReboot;
pDnode->numOfCores = pStatus->numOfCores;
pDnode->diskAvailable = pStatus->diskAvailable;
......@@ -334,7 +322,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
pDnode->lastAccess = tsAccessSquence;
if (pStatus->dnodeId == 0) {
mTrace("dnode:%d, first access, privateIp:%s, name:%s", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName);
mTrace("dnode:%d %s, first access", pDnode->dnodeId, pDnode->dnodeEp);
} else {
//mTrace("dnode:%d, status received, access times %d", pDnode->dnodeId, pDnode->lastAccess);
}
......@@ -347,7 +335,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
SVgObj *pVgroup = mgmtGetVgroup(pVload->vgId);
if (pVgroup == NULL) {
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->privateIp);
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->dnodeEp);
mPrint("dnode:%d, vgroup:%d not exist in mnode, drop it", pDnode->dnodeId, pVload->vgId);
mgmtSendDropVnodeMsg(pVload->vgId, &ipSet, NULL);
} else {
......@@ -391,25 +379,24 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
rpcSendResponse(&rpcRsp);
}
static int32_t mgmtCreateDnode(uint32_t ip) {
static int32_t mgmtCreateDnode(char *ep) {
int32_t grantCode = grantCheck(TSDB_GRANT_DNODE);
if (grantCode != TSDB_CODE_SUCCESS) {
return grantCode;
}
SDnodeObj *pDnode = mgmtGetDnodeByIp(ip);
SDnodeObj *pDnode = mgmtGetDnodeByIp(ep);
if (pDnode != NULL) {
mError("dnode:%d is alredy exist, ip:%s", pDnode->dnodeId, taosIpStr(pDnode->privateIp));
mError("dnode:%d is alredy exist, %s:%d", pDnode->dnodeId, pDnode->dnodeFqdn, pDnode->dnodePort);
return TSDB_CODE_DNODE_ALREADY_EXIST;
}
pDnode = (SDnodeObj *) calloc(1, sizeof(SDnodeObj));
pDnode->privateIp = ip;
pDnode->publicIp = ip;
pDnode->createdTime = taosGetTimestampMs();
pDnode->status = TAOS_DN_STATUS_OFFLINE;
pDnode->totalVnodes = TSDB_INVALID_VNODE_NUM;
sprintf(pDnode->dnodeName, "n%d", sdbGetId(tsDnodeSdb) + 1);
strcpy(pDnode->dnodeEp, ep);
taosGetFqdnPortFromEp(ep, pDnode->dnodeFqdn, &pDnode->dnodePort);
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
......@@ -446,15 +433,15 @@ int32_t mgmtDropDnode(SDnodeObj *pDnode) {
return code;
}
static int32_t mgmtDropDnodeByIp(uint32_t ip) {
SDnodeObj *pDnode = mgmtGetDnodeByIp(ip);
static int32_t mgmtDropDnodeByIp(char *ep) {
SDnodeObj *pDnode = mgmtGetDnodeByIp(ep);
if (pDnode == NULL) {
mError("dnode:%s, is not exist", taosIpStr(ip));
mError("dnode:%s, is not exist", ep);
return TSDB_CODE_DNODE_NOT_EXIST;
}
if (pDnode->privateIp == dnodeGetMnodeMasteIp()) {
mError("dnode:%d, can't drop dnode which is master", pDnode->dnodeId);
if (strcmp(pDnode->dnodeEp, dnodeGetMnodeMasterEp()) == 0) {
mError("dnode:%d, can't drop dnode:%s which is master", pDnode->dnodeId, ep);
return TSDB_CODE_NO_REMOVE_MASTER;
}
......@@ -473,13 +460,12 @@ static void mgmtProcessCreateDnodeMsg(SQueuedMsg *pMsg) {
if (strcmp(pMsg->pUser->user, "root") != 0) {
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
} else {
uint32_t ip = inet_addr(pCreate->ip);
rpcRsp.code = mgmtCreateDnode(ip);
rpcRsp.code = mgmtCreateDnode(pCreate->ep);
if (rpcRsp.code == TSDB_CODE_SUCCESS) {
SDnodeObj *pDnode = mgmtGetDnodeByIp(ip);
mLPrint("dnode:%d, ip:%s is created by %s", pDnode->dnodeId, pCreate->ip, pMsg->pUser->user);
SDnodeObj *pDnode = mgmtGetDnodeByIp(pCreate->ep);
mLPrint("dnode:%d, %s is created by %s", pDnode->dnodeId, pCreate->ep, pMsg->pUser->user);
} else {
mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(rpcRsp.code));
mError("failed to create dnode:%s, reason:%s", pCreate->ep, tstrerror(rpcRsp.code));
}
}
rpcSendResponse(&rpcRsp);
......@@ -490,15 +476,15 @@ static void mgmtProcessDropDnodeMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SCMDropDnodeMsg *pDrop = pMsg->pCont;
if (strcmp(pMsg->pUser->user, "root") != 0) {
rpcRsp.code = TSDB_CODE_NO_RIGHTS;
} else {
uint32_t ip = inet_addr(pDrop->ip);
rpcRsp.code = mgmtDropDnodeByIp(ip);
rpcRsp.code = mgmtDropDnodeByIp(pDrop->ep);
if (rpcRsp.code == TSDB_CODE_SUCCESS) {
mLPrint("dnode:%s is dropped by %s", pDrop->ip, pMsg->pUser->user);
mLPrint("dnode:%s is dropped by %s", pDrop->ep, pMsg->pUser->user);
} else {
mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(rpcRsp.code));
mError("failed to drop dnode:%s, reason:%s", pDrop->ep, tstrerror(rpcRsp.code));
}
}
......@@ -523,15 +509,9 @@ static int32_t mgmtGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 16;
pShow->bytes[cols] = 40;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "private ip");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 16;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "public ip");
strcpy(pSchema[cols].name, "end point");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
......@@ -581,7 +561,6 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi
int32_t cols = 0;
SDnodeObj *pDnode = NULL;
char *pWrite;
char ipstr[32];
while (numOfRows < rows) {
pShow->pNode = mgmtGetNextDnode(pShow->pNode, &pDnode);
......@@ -593,14 +572,8 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi
*(int16_t *)pWrite = pDnode->dnodeId;
cols++;
tinet_ntoa(ipstr, pDnode->privateIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr);
cols++;
tinet_ntoa(ipstr, pDnode->publicIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr);
strncpy(pWrite, pDnode->dnodeEp, pShow->bytes[cols]-1);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......@@ -652,9 +625,9 @@ static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 16;
pShow->bytes[cols] = 40;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "ip");
strcpy(pSchema[cols].name, "end point");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
......@@ -702,10 +675,8 @@ int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pCo
*(int16_t *)pWrite = pDnode->dnodeId;
cols++;
char ipstr[20];
tinet_ntoa(ipstr, pDnode->privateIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr);
strncpy(pWrite, pDnode->dnodeEp, pShow->bytes[cols]-1);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......@@ -865,8 +836,7 @@ static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
SDnodeObj *pDnode = NULL;
if (pShow->payloadLen > 0 ) {
uint32_t ip = ip2uint(pShow->payload);
pDnode = mgmtGetDnodeByIp(ip);
pDnode = mgmtGetDnodeByIp(pShow->payload);
} else {
mgmtGetNextDnode(NULL, (SDnodeObj **)&pDnode);
}
......
......@@ -130,7 +130,7 @@ int32_t mgmtInitSystem() {
struct stat dirstat;
bool fileExist = (stat(tsMnodeDir, &dirstat) == 0);
bool asMaster = (strcmp(tsMasterIp, tsPrivateIp) == 0);
bool asMaster = (strcmp(tsMaster, tsLocalEp) == 0);
if (asMaster || fileExist) {
if (mgmtStartSystem() != 0) {
......
......@@ -171,25 +171,21 @@ char *mgmtGetMnodeRoleStr(int32_t role) {
}
}
void mgmtGetMnodeIpSet(SRpcIpSet *ipSet, bool usePublicIp) {
void mgmtGetMnodeIpSet(SRpcIpSet *ipSet) {
void *pNode = NULL;
while (1) {
SMnodeObj *pMnode = NULL;
pNode = mgmtGetNextMnode(pNode, &pMnode);
if (pMnode == NULL) break;
if (usePublicIp) {
ipSet->ip[ipSet->numOfIps] = htonl(pMnode->pDnode->publicIp);
} else {
ipSet->ip[ipSet->numOfIps] = htonl(pMnode->pDnode->privateIp);
}
strcpy(ipSet->fqdn[ipSet->numOfIps], pMnode->pDnode->dnodeFqdn);
ipSet->port[ipSet->numOfIps] = htons(pMnode->pDnode->dnodePort);
if (pMnode->role == TAOS_SYNC_ROLE_MASTER) {
ipSet->inUse = ipSet->numOfIps;
}
ipSet->numOfIps++;
ipSet->port = htons(pMnode->pDnode->mnodeShellPort);
mgmtReleaseMnode(pMnode);
}
......@@ -207,10 +203,7 @@ void mgmtGetMnodeInfos(void *param) {
if (pMnode == NULL) break;
mnodes->nodeInfos[index].nodeId = htonl(pMnode->mnodeId);
mnodes->nodeInfos[index].nodeIp = htonl(pMnode->pDnode->privateIp);
mnodes->nodeInfos[index].nodePort = htons(pMnode->pDnode->mnodeDnodePort);
mnodes->nodeInfos[index].syncPort = htons(pMnode->pDnode->syncPort);
strcpy(mnodes->nodeInfos[index].nodeName, pMnode->pDnode->dnodeName);
strcpy(mnodes->nodeInfos[index].nodeEp, pMnode->pDnode->dnodeEp);
if (pMnode->role == TAOS_SYNC_ROLE_MASTER) {
mnodes->inUse = index;
}
......@@ -282,15 +275,9 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 16;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "private ip");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 16;
pShow->bytes[cols] = 40;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "public ip");
strcpy(pSchema[cols].name, "end point");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
......@@ -327,7 +314,6 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi
int32_t cols = 0;
SMnodeObj *pMnode = NULL;
char *pWrite;
char ipstr[32];
while (numOfRows < rows) {
pShow->pNode = mgmtGetNextMnode(pShow->pNode, &pMnode);
......@@ -339,14 +325,8 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi
*(int16_t *)pWrite = pMnode->mnodeId;
cols++;
tinet_ntoa(ipstr, pMnode->pDnode->privateIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr);
cols++;
tinet_ntoa(ipstr, pMnode->pDnode->publicIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr);
strncpy(pWrite, pMnode->pDnode->dnodeEp, pShow->bytes[cols]-1);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......
......@@ -231,8 +231,8 @@ void sdbUpdateSync() {
for (int32_t i = 0; i < mnodes->nodeNum; ++i) {
SDMMnodeInfo *node = &mnodes->nodeInfos[i];
syncCfg.nodeInfo[i].nodeId = node->nodeId;
syncCfg.nodeInfo[i].nodeIp = node->nodeIp;
strcpy(syncCfg.nodeInfo[i].name, node->nodeName);
taosGetFqdnPortFromEp(node->nodeEp, syncCfg.nodeInfo[i].nodeFqdn, &syncCfg.nodeInfo[i].nodePort);
syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC;
index++;
}
......@@ -244,8 +244,8 @@ void sdbUpdateSync() {
if (pMnode == NULL) break;
syncCfg.nodeInfo[index].nodeId = pMnode->mnodeId;
syncCfg.nodeInfo[index].nodeIp = pMnode->pDnode->privateIp;
strcpy(syncCfg.nodeInfo[index].name, pMnode->pDnode->dnodeName);
syncCfg.nodeInfo[index].nodePort = pMnode->pDnode->dnodePort + TSDB_PORT_SYNC;
strcpy(syncCfg.nodeInfo[index].nodeFqdn, pMnode->pDnode->dnodeEp);
index++;
mgmtReleaseMnode(pMnode);
......@@ -253,7 +253,8 @@ void sdbUpdateSync() {
}
syncCfg.replica = index;
syncCfg.arbitratorIp = syncCfg.nodeInfo[0].nodeIp;
syncCfg.arbitratorPort = syncCfg.nodeInfo[0].nodePort;
strcpy(syncCfg.arbitratorFqdn, syncCfg.nodeInfo[0].nodeFqdn);
if (syncCfg.replica == 1) {
syncCfg.quorum = 1;
} else {
......@@ -271,10 +272,9 @@ void sdbUpdateSync() {
if (!hasThisDnode) return;
if (memcmp(&syncCfg, &tsSdbObj.cfg, sizeof(SSyncCfg)) == 0) return;
sdbPrint("work as mnode, replica:%d arbitratorIp:%s", syncCfg.replica, taosIpStr(syncCfg.arbitratorIp));
sdbPrint("work as mnode, replica:%d arbitrator:%s", syncCfg.replica, syncCfg.arbitratorFqdn);
for (int32_t i = 0; i < syncCfg.replica; ++i) {
sdbPrint("mnode:%d, ip:%s name:%s", syncCfg.nodeInfo[i].nodeId, taosIpStr(syncCfg.nodeInfo[i].nodeIp),
syncCfg.nodeInfo[i].name);
sdbPrint("mnode:%d, ip:%s", syncCfg.nodeInfo[i].nodeId, syncCfg.nodeInfo[i].nodeFqdn);
}
SSyncInfo syncInfo;
......
......@@ -72,7 +72,6 @@ int32_t mgmtInitShell() {
}
SRpcInit rpcInit = {0};
rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;
rpcInit.localPort = tsMnodeShellPort;
rpcInit.label = "MND-shell";
rpcInit.numOfThreads = numOfThreads;
......@@ -148,14 +147,12 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
if (!sdbIsMaster()) {
SRpcConnInfo connInfo;
rpcGetConnInfo(rpcMsg->handle, &connInfo);
bool usePublicIp = (connInfo.serverIp == tsPublicIpInt);
SRpcIpSet ipSet = {0};
ipSet.port = tsMnodeShellPort;
dnodeGetMnodeIpSet(&ipSet, usePublicIp);
mgmtGetMnodeIpSet(&ipSet);
mTrace("conn from shell ip:%s user:%s redirect msg, inUse:%d", taosIpStr(connInfo.clientIp), connInfo.user, ipSet.inUse);
for (int32_t i = 0; i < ipSet.numOfIps; ++i) {
mTrace("index:%d ip:%s", i, taosIpStr(ipSet.ip[i]));
mTrace("index:%d ip:%s:%d", i, ipSet.fqdn[i], ipSet.port[i]);
}
rpcSendRedirectRsp(rpcMsg->handle, &ipSet);
......@@ -343,7 +340,7 @@ static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) {
return;
}
mgmtGetMnodeIpSet(&pHBRsp->ipList, pMsg->usePublicIp);
mgmtGetMnodeIpSet(&pHBRsp->ipList);
/*
* TODO
......@@ -429,7 +426,7 @@ static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) {
pConnectRsp->writeAuth = pUser->writeAuth;
pConnectRsp->superAuth = pUser->superAuth;
mgmtGetMnodeIpSet(&pConnectRsp->ipList, pMsg->usePublicIp);
mgmtGetMnodeIpSet(&pConnectRsp->ipList);
connect_over:
rpcRsp.code = code;
......
......@@ -1230,8 +1230,8 @@ static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) {
SDnodeObj *pDnode = pVgroup->vnodeGid[vn].pDnode;
if (pDnode == NULL) break;
pRsp->vgroups[vg].ipAddr[vn].ip = htonl(pDnode->privateIp);
pRsp->vgroups[vg].ipAddr[vn].port = htons(tsDnodeShellPort);
strcpy(pRsp->vgroups[vg].ipAddr[vn].fqdn, pDnode->dnodeFqdn);
pRsp->vgroups[vg].ipAddr[vn].port = htons(pDnode->dnodePort + TSDB_PORT_DNODESHELL);
pRsp->vgroups[vg].numOfIps++;
}
......@@ -1607,7 +1607,6 @@ static int32_t mgmtSetSchemaFromNormalTable(SSchema *pSchema, SChildTableObj *pT
static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
SDbObj *pDb = pMsg->pDb;
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
int8_t usePublicIp = pMsg->usePublicIp;
pMeta->uid = htobe64(pTable->uid);
pMeta->sid = htonl(pTable->sid);
......@@ -1637,13 +1636,8 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId);
if (pDnode == NULL) break;
if (usePublicIp) {
pMeta->vgroup.ipAddr[i].ip = htonl(pDnode->publicIp);
pMeta->vgroup.ipAddr[i].port = htonl(tsDnodeShellPort);
} else {
pMeta->vgroup.ipAddr[i].ip = htonl(pDnode->privateIp);
pMeta->vgroup.ipAddr[i].port = htonl(tsDnodeShellPort);
}
strcpy(pMeta->vgroup.ipAddr[i].fqdn, pDnode->dnodeFqdn);
pMeta->vgroup.ipAddr[i].port = htonl(pDnode->dnodePort + TSDB_PORT_DNODESHELL);
pMeta->vgroup.numOfIps++;
mgmtDecDnodeRef(pDnode);
}
......@@ -1799,8 +1793,8 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) {
mgmtDecTableRef(pTable);
return;
}
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode);
SDnodeObj *pDnode = mgmtGetDnode(pCfg->dnode);
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->dnodeEp);
SRpcMsg rpcRsp = {
.handle = NULL,
.pCont = pMDCreate,
......
......@@ -162,7 +162,7 @@ static int32_t mgmtVgroupActionEncode(SSdbOper *pOper) {
SVgObj *pVgroup = pOper->pObj;
memcpy(pOper->rowData, pVgroup, tsVgUpdateSize);
SVgObj *pTmpVgroup = pOper->rowData;
for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
pTmpVgroup->vnodeGid[i].pDnode = NULL;
pTmpVgroup->vnodeGid[i].role = 0;
}
......@@ -260,7 +260,7 @@ void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVlo
}
if (!dnodeExist) {
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->privateIp);
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->dnodeEp);
mError("vgroup:%d, dnode:%d not exist in mnode, drop it", pVload->vgId, pDnode->dnodeId);
mgmtSendDropVnodeMsg(pVload->vgId, &ipSet, NULL);
return;
......@@ -401,9 +401,9 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 16;
pShow->bytes[cols] = 40;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "ip");
strcpy(pSchema[cols].name, "end point");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
......@@ -440,7 +440,6 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
SVgObj *pVgroup = NULL;
int32_t maxReplica = 0;
int32_t cols = 0;
char ipstr[20];
char * pWrite;
SDbObj *pDb = mgmtGetDb(pShow->db);
......@@ -479,10 +478,10 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
SDnodeObj *pDnode = pVgroup->vnodeGid[i].pDnode;
if (pDnode != NULL) {
tinet_ntoa(ipstr, pDnode->privateIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr);
strncpy(pWrite, pDnode->dnodeEp, pShow->bytes[cols]-1);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, mgmtGetMnodeRoleStr(pVgroup->vnodeGid[i].role));
cols++;
......@@ -559,11 +558,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) {
SDnodeObj *pDnode = pVgroup->vnodeGid[j].pDnode;
if (pDnode != NULL) {
pNodes[j].nodeId = htonl(pDnode->dnodeId);
pNodes[j].nodeIp = htonl(pDnode->privateIp);
strcpy(pNodes[j].nodeName, pDnode->dnodeName);
if (j == 0) {
pCfg->arbitratorIp = htonl(pDnode->privateIp);
}
strcpy(pNodes[j].nodeEp, pDnode->dnodeEp);
}
}
......@@ -574,21 +569,21 @@ SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) {
SRpcIpSet ipSet = {
.numOfIps = pVgroup->numOfVnodes,
.inUse = 0,
.port = tsDnodeMnodePort
};
for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
ipSet.ip[i] = pVgroup->vnodeGid[i].pDnode->privateIp;
strcpy(ipSet.fqdn[i], pVgroup->vnodeGid[i].pDnode->dnodeFqdn);
ipSet.port[i] = pVgroup->vnodeGid[i].pDnode->dnodePort + TSDB_PORT_DNODESHELL;
}
return ipSet;
}
SRpcIpSet mgmtGetIpSetFromIp(uint32_t ip) {
SRpcIpSet ipSet = {
.ip[0] = ip,
.numOfIps = 1,
.inUse = 0,
.port = tsDnodeMnodePort
};
SRpcIpSet mgmtGetIpSetFromIp(char *ep) {
SRpcIpSet ipSet;
ipSet.numOfIps = 1;
ipSet.inUse = 0;
taosGetFqdnPortFromEp(ep, ipSet.fqdn[0], &ipSet.port[0]);
ipSet.port[0] += TSDB_PORT_DNODEMNODE;
return ipSet;
}
......@@ -608,7 +603,7 @@ void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) {
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
mTrace("vgroup:%d, send create all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->privateIp);
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp);
mgmtSendCreateVnodeMsg(pVgroup, &ipSet, ahandle);
}
}
......@@ -674,7 +669,7 @@ void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) {
static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
mTrace("vgroup:%d, send drop all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->privateIp);
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp);
mgmtSendDropVnodeMsg(pVgroup->vgId, &ipSet, ahandle);
}
}
......@@ -737,7 +732,7 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SUCCESS);
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->privateIp);
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->dnodeEp);
mgmtSendCreateVnodeMsg(pVgroup, &ipSet, NULL);
}
......
......@@ -361,6 +361,8 @@ static bool taosGetCardName(char *ip, char *name) {
static bool taosGetCardInfo(int64_t *bytes) {
static char tsPublicCard[1000] = {0};
static char tsPrivateIp[40];
if (tsPublicCard[0] == 0) {
if (!taosGetCardName(tsPrivateIp, tsPublicCard)) {
uError("can't get card name from ip:%s", tsPrivateIp);
......
......@@ -210,7 +210,7 @@ typedef struct HttpThread {
typedef struct HttpServer {
char label[HTTP_LABEL_SIZE];
char serverIp[16];
uint32_t serverIp;
uint16_t serverPort;
int cacheContext;
int sessionExpire;
......
......@@ -48,7 +48,7 @@ int httpInitSystem() {
memset(httpServer, 0, sizeof(HttpServer));
strcpy(httpServer->label, "rest");
strcpy(httpServer->serverIp, tsHttpIp);
httpServer->serverIp = 0;
httpServer->serverPort = tsHttpPort;
httpServer->cacheContext = tsHttpCacheSessions;
httpServer->sessionExpire = tsHttpSessionExpire;
......
......@@ -68,7 +68,7 @@ typedef enum {
typedef struct {
void * conn;
void * timer;
char privateIpStr[TSDB_IPv4ADDR_LEN];
char ep[TSDB_FQDN_LEN];
int8_t cmdIndex;
int8_t state;
char sql[SQL_LENGTH];
......@@ -112,14 +112,8 @@ static void monitorInitConn(void *para, void *unused) {
monitorPrint("starting to initialize monitor service ..");
tsMonitorConn.state = MONITOR_STATE_INITIALIZING;
if (tsMonitorConn.privateIpStr[0] == 0) {
strcpy(tsMonitorConn.privateIpStr, tsPrivateIp);
for (int32_t i = 0; i < TSDB_IPv4ADDR_LEN; ++i) {
if (tsMonitorConn.privateIpStr[i] == '.') {
tsMonitorConn.privateIpStr[i] = '_';
}
}
}
if (tsMonitorConn.ep[0] == 0)
strcpy(tsMonitorConn.ep, tsLocalEp);
if (tsMonitorConn.conn == NULL) {
taos_connect_a(NULL, "monitor", tsInternalPass, "", 0, monitorInitConnCb, &tsMonitorConn, &(tsMonitorConn.conn));
......@@ -163,7 +157,7 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) {
tsMonitorDbName, IP_LEN_STR + 1);
} else if (cmd == MONITOR_CMD_CREATE_TB_DN) {
snprintf(sql, SQL_LENGTH, "create table if not exists %s.dn_%s using %s.dn tags('%s')", tsMonitorDbName,
tsMonitorConn.privateIpStr, tsMonitorDbName, tsPrivateIp);
tsMonitorConn.ep, tsMonitorDbName, tsLocalEp);
} else if (cmd == MONITOR_CMD_CREATE_MT_ACCT) {
snprintf(sql, SQL_LENGTH,
"create table if not exists %s.acct(ts timestamp "
......@@ -214,7 +208,7 @@ static void monitorInitDatabaseCb(void *param, TAOS_RES *result, int32_t code) {
if (-code == TSDB_CODE_TABLE_ALREADY_EXIST || -code == TSDB_CODE_DB_ALREADY_EXIST || code >= 0) {
monitorTrace("monitor:%p, sql success, reason:%d, %s", tsMonitorConn.conn, tstrerror(code), tsMonitorConn.sql);
if (tsMonitorConn.cmdIndex == MONITOR_CMD_CREATE_TB_LOG) {
monitorPrint("dnode:%s is started", tsPrivateIp);
monitorPrint("dnode:%s is started", tsLocalEp);
}
tsMonitorConn.cmdIndex++;
monitorInitDatabase();
......@@ -346,7 +340,7 @@ static void monitorSaveSystemInfo() {
int64_t ts = taosGetTimestampUs();
char * sql = tsMonitorConn.sql;
int32_t pos = snprintf(sql, SQL_LENGTH, "insert into %s.dn_%s values(%" PRId64, tsMonitorDbName, tsMonitorConn.privateIpStr, ts);
int32_t pos = snprintf(sql, SQL_LENGTH, "insert into %s.dn_%s values(%" PRId64, tsMonitorDbName, tsMonitorConn.ep, ts);
pos += monitorBuildCpuSql(sql + pos);
pos += monitorBuildMemorySql(sql + pos);
......@@ -414,7 +408,7 @@ void monitorSaveLog(int32_t level, const char *const format, ...) {
va_end(argpointer);
if (len > max_length) len = max_length;
len += sprintf(sql + len, "', '%s')", tsPrivateIp);
len += sprintf(sql + len, "', '%s')", tsLocalEp);
sql[len++] = 0;
monitorTrace("monitor:%p, save log, sql: %s", tsMonitorConn.conn, sql);
......
......@@ -383,7 +383,6 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
return errno;
}
char ipStr[20];
int32_t len = 0;
int32_t maxLen = 1000;
char * content = calloc(1, maxLen + 1);
......@@ -408,19 +407,10 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnodeCfg->cfg.wals);
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnodeCfg->cfg.quorum);
uint32_t ipInt = pVnodeCfg->cfg.arbitratorIp;
sprintf(ipStr, "%u.%u.%u.%u", ipInt & 0xFF, (ipInt >> 8) & 0xFF, (ipInt >> 16) & 0xFF, (uint8_t)(ipInt >> 24));
len += snprintf(content + len, maxLen - len, " \"arbitratorIp\": \"%s\",\n", ipStr);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) {
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", pVnodeCfg->nodes[i].nodeId);
uint32_t ipInt = pVnodeCfg->nodes[i].nodeIp;
sprintf(ipStr, "%u.%u.%u.%u", ipInt & 0xFF, (ipInt >> 8) & 0xFF, (ipInt >> 16) & 0xFF, (uint8_t)(ipInt >> 24));
len += snprintf(content + len, maxLen - len, " \"nodeIp\": \"%s\",\n", ipStr);
len += snprintf(content + len, maxLen - len, " \"nodeName\": \"%s\"\n", pVnodeCfg->nodes[i].nodeName);
len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\",\n", pVnodeCfg->nodes[i].nodeEp);
if (i < pVnodeCfg->cfg.replications - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
......@@ -585,13 +575,6 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
}
pVnode->syncCfg.quorum = (int8_t)quorum->valueint;
cJSON *arbitratorIp = cJSON_GetObjectItem(root, "arbitratorIp");
if (!arbitratorIp || arbitratorIp->type != cJSON_String || arbitratorIp->valuestring == NULL) {
dError("pVnode:%p vgId:%d, failed to read vnode cfg, arbitratorIp not found", pVnode, pVnode->vgId);
goto PARSE_OVER;
}
pVnode->syncCfg.arbitratorIp = inet_addr(arbitratorIp->valuestring);
cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
if (!nodeInfos || nodeInfos->type != cJSON_Array) {
dError("pVnode:%p vgId:%d, failed to read vnode cfg, nodeInfos not found", pVnode, pVnode->vgId);
......@@ -615,27 +598,22 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
}
pVnode->syncCfg.nodeInfo[i].nodeId = nodeId->valueint;
cJSON *nodeIp = cJSON_GetObjectItem(nodeInfo, "nodeIp");
if (!nodeIp || nodeIp->type != cJSON_String || nodeIp->valuestring == NULL) {
dError("pVnode:%p vgId:%d, failed to read vnode cfg, nodeIp not found", pVnode, pVnode->vgId);
cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp");
if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) {
dError("pVnode:%p vgId:%d, failed to read vnode cfg, nodeFqdn not found", pVnode, pVnode->vgId);
goto PARSE_OVER;
}
pVnode->syncCfg.nodeInfo[i].nodeIp = inet_addr(nodeIp->valuestring);
cJSON *nodeName = cJSON_GetObjectItem(nodeInfo, "nodeName");
if (!nodeName || nodeName->type != cJSON_String || nodeName->valuestring == NULL) {
dError("pVnode:%p vgId:%d, failed to read vnode cfg, nodeName not found", pVnode, pVnode->vgId);
goto PARSE_OVER;
}
strncpy(pVnode->syncCfg.nodeInfo[i].name, nodeName->valuestring, TSDB_NODE_NAME_LEN);
taosGetFqdnPortFromEp(nodeEp->valuestring, pVnode->syncCfg.nodeInfo[i].nodeFqdn, &pVnode->syncCfg.nodeInfo[i].nodePort);
pVnode->syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC;
}
ret = 0;
dPrint("pVnode:%p vgId:%d, read vnode cfg successed, replcia:%d", pVnode, pVnode->vgId, pVnode->syncCfg.replica);
for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) {
dPrint("pVnode:%p vgId:%d, dnode:%d, ip:%s name:%s", pVnode, pVnode->vgId, pVnode->syncCfg.nodeInfo[i].nodeId,
taosIpStr(pVnode->syncCfg.nodeInfo[i].nodeIp), pVnode->syncCfg.nodeInfo[i].name);
dPrint("pVnode:%p vgId:%d, dnode:%d, %s:%d", pVnode, pVnode->vgId, pVnode->syncCfg.nodeInfo[i].nodeId,
pVnode->syncCfg.nodeInfo[i].nodeFqdn, pVnode->syncCfg.nodeInfo[i].nodePort);
}
PARSE_OVER:
......@@ -714,4 +692,4 @@ PARSE_OVER:
cJSON_Delete(root);
fclose(fp);
return ret;
}
\ No newline at end of file
}
......@@ -104,7 +104,12 @@ void taos_error(TAOS *con) {
void* taos_execute(void *param) {
ThreadObj *pThread = (ThreadObj *)param;
void *taos = taos_connect(tsMasterIp, tsDefaultUser, tsDefaultPass, NULL, 0);
char fqdn[TSDB_FQDN_LEN];
uint16_t port;
taosGetFqdnPortFromEp(tsMaster, fqdn, &port);
void *taos = taos_connect(fqdn, tsDefaultUser, tsDefaultPass, NULL, port);
if (taos == NULL) taos_error(taos);
char sql[1024] = {0};
......
......@@ -68,7 +68,12 @@ void createDbAndTable() {
int64_t st, et;
char qstr[64000];
con = taos_connect(tsMasterIp, tsDefaultUser, tsDefaultPass, NULL, 0);
char fqdn[TSDB_FQDN_LEN];
uint16_t port;
taosGetFqdnPortFromEp(tsMaster, fqdn, &port);
con = taos_connect(fqdn, tsDefaultUser, tsDefaultPass, NULL, port);
if (con == NULL) {
pError("failed to connect to DB, reason:%s", taos_errstr(con));
exit(1);
......@@ -190,8 +195,12 @@ void *syncTest(void *param) {
int maxBytes = 60000;
pPrint("thread:%d, start to run", pInfo->threadIndex);
char fqdn[TSDB_FQDN_LEN];
uint16_t port;
taosGetFqdnPortFromEp(tsMaster, fqdn, &port);
con = taos_connect(tsMasterIp, tsDefaultUser, tsDefaultPass, NULL, 0);
con = taos_connect(fqdn, tsDefaultUser, tsDefaultPass, NULL, port);
if (con == NULL) {
pError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(con));
exit(1);
......
......@@ -68,7 +68,10 @@ void createDbAndTable() {
int64_t st, et;
char qstr[64000];
con = taos_connect(tsMasterIp, tsDefaultUser, tsDefaultPass, NULL, 0);
char fqdn[TSDB_FQDN_LEN];
uint16_t port;
taosGetFqdnPortFromEp(tsMaster, fqdn, &port);
con = taos_connect(fqdn, tsDefaultUser, tsDefaultPass, NULL, port);
if (con == NULL) {
pError("failed to connect to DB, reason:%s", taos_errstr(con));
exit(1);
......@@ -191,7 +194,11 @@ void *syncTest(void *param) {
pPrint("thread:%d, start to run", pInfo->threadIndex);
con = taos_connect(tsMasterIp, tsDefaultUser, tsDefaultPass, NULL, 0);
char fqdn[TSDB_FQDN_LEN];
uint16_t port;
taosGetFqdnPortFromEp(tsMaster, fqdn, &port);
con = taos_connect(fqdn, tsDefaultUser, tsDefaultPass, NULL, port);
if (con == NULL) {
pError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(con));
exit(1);
......
......@@ -68,7 +68,12 @@ void createDbAndTable() {
int64_t st, et;
char qstr[64000];
con = taos_connect(tsMasterIp, tsDefaultUser, tsDefaultPass, NULL, 0);
char fqdn[TSDB_FQDN_LEN];
uint16_t port;
taosGetFqdnPortFromEp(tsMaster, fqdn, &port);
con = taos_connect(fqdn, tsDefaultUser, tsDefaultPass, NULL, port);
if (con == NULL) {
pError("failed to connect to DB, reason:%s", taos_errstr(con));
exit(1);
......@@ -191,7 +196,12 @@ void *syncTest(void *param) {
pPrint("thread:%d, start to run", pInfo->threadIndex);
con = taos_connect(tsMasterIp, tsDefaultUser, tsDefaultPass, NULL, 0);
char fqdn[TSDB_FQDN_LEN];
uint16_t port;
taosGetFqdnPortFromEp(tsMaster, fqdn, &port);
con = taos_connect(fqdn, tsDefaultUser, tsDefaultPass, NULL, port);
if (con == NULL) {
pError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(con));
exit(1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册